diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/create/CreateMappingRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/create/CreateMappingRequest.java index 770c648f642..b9221b4e157 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/create/CreateMappingRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/create/CreateMappingRequest.java @@ -22,13 +22,16 @@ package org.elasticsearch.action.admin.indices.mapping.create; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.util.Required; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.io.Streamable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.Actions.*; +import static org.elasticsearch.util.TimeValue.*; /** * @author kimchy (Shay Banon) @@ -41,6 +44,8 @@ public class CreateMappingRequest implements ActionRequest, Streamable { private String mappingSource; + private TimeValue timeout = new TimeValue(10, TimeUnit.SECONDS); + CreateMappingRequest() { } @@ -79,7 +84,7 @@ public class CreateMappingRequest implements ActionRequest, Streamable { return indices; } - String mappingType() { + String type() { return mappingType; } @@ -87,7 +92,7 @@ public class CreateMappingRequest implements ActionRequest, Streamable { * The type of the mappings. Not required since it can be defined explicitly within the mapping source. * If it is not defined within the mapping source, then it is required. */ - public CreateMappingRequest mappingType(String mappingType) { + public CreateMappingRequest type(String mappingType) { this.mappingType = mappingType; return this; } @@ -101,6 +106,15 @@ public class CreateMappingRequest implements ActionRequest, Streamable { return this; } + TimeValue timeout() { + return timeout; + } + + public CreateMappingRequest timeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { indices = new String[in.readInt()]; for (int i = 0; i < indices.length; i++) { @@ -110,6 +124,7 @@ public class CreateMappingRequest implements ActionRequest, Streamable { mappingType = in.readUTF(); } mappingSource = in.readUTF(); + timeout = readTimeValue(in); } @Override public void writeTo(DataOutput out) throws IOException { @@ -128,5 +143,6 @@ public class CreateMappingRequest implements ActionRequest, Streamable { out.writeUTF(mappingType); } out.writeUTF(mappingSource); + timeout.writeTo(out); } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/create/TransportCreateMappingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/create/TransportCreateMappingAction.java index 33565b78cbe..64a41780b86 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/create/TransportCreateMappingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/mapping/create/TransportCreateMappingAction.java @@ -103,7 +103,7 @@ public class TransportCreateMappingAction extends BaseAction listeners = new CopyOnWriteArrayList(); + + @Inject public NodeMappingCreatedAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService) { + super(settings); + this.threadPool = threadPool; + this.transportService = transportService; + this.clusterService = clusterService; + transportService.registerHandler(NodeMappingCreatedTransportHandler.ACTION, new NodeMappingCreatedTransportHandler()); + } + + public void add(Listener listener) { + listeners.add(listener); + } + + public void remove(Listener listener) { + listeners.remove(listener); + } + + public void nodeMappingCreated(final NodeMappingCreatedResponse response) throws ElasticSearchException { + Nodes nodes = clusterService.state().nodes(); + if (nodes.localNodeMaster()) { + threadPool.execute(new Runnable() { + @Override public void run() { + innerNodeIndexCreated(response); + } + }); + } else { + transportService.sendRequest(clusterService.state().nodes().masterNode(), + NodeMappingCreatedTransportHandler.ACTION, response, VoidTransportResponseHandler.INSTANCE); + } + } + + private void innerNodeIndexCreated(NodeMappingCreatedResponse response) { + for (Listener listener : listeners) { + listener.onNodeMappingCreated(response); + } + } + + + public static interface Listener { + void onNodeMappingCreated(NodeMappingCreatedResponse response); + } + + private class NodeMappingCreatedTransportHandler extends BaseTransportRequestHandler { + + static final String ACTION = "cluster/nodeMappingCreated"; + + @Override public NodeMappingCreatedResponse newInstance() { + return new NodeMappingCreatedResponse(); + } + + @Override public void messageReceived(NodeMappingCreatedResponse response, TransportChannel channel) throws Exception { + innerNodeIndexCreated(response); + channel.sendResponse(VoidStreamable.INSTANCE); + } + } + + public static class NodeMappingCreatedResponse implements Streamable { + + private String index; + + private String type; + + private String nodeId; + + private NodeMappingCreatedResponse() { + } + + public NodeMappingCreatedResponse(String index, String type, String nodeId) { + this.index = index; + this.type = type; + this.nodeId = nodeId; + } + + public String index() { + return index; + } + + public String type() { + return type; + } + + public String nodeId() { + return nodeId; + } + + @Override public void writeTo(DataOutput out) throws IOException { + out.writeUTF(index); + out.writeUTF(type); + out.writeUTF(nodeId); + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + index = in.readUTF(); + type = in.readUTF(); + nodeId = in.readUTF(); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java index 24a68b77cd4..b0ae246f5d4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaDataService.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.metadata; +import com.google.common.collect.Sets; import com.google.inject.Inject; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterService; @@ -26,6 +27,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; +import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.strategy.ShardsRoutingStrategy; @@ -44,6 +46,7 @@ import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.settings.Settings; import java.util.Arrays; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -67,14 +70,18 @@ public class MetaDataService extends AbstractComponent { private final NodeIndexDeletedAction nodeIndexDeletedAction; + private final NodeMappingCreatedAction nodeMappingCreatedAction; + @Inject public MetaDataService(Settings settings, ClusterService clusterService, IndicesService indicesService, ShardsRoutingStrategy shardsRoutingStrategy, - NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction) { + NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction, + NodeMappingCreatedAction nodeMappingCreatedAction) { super(settings); this.clusterService = clusterService; this.indicesService = indicesService; this.shardsRoutingStrategy = shardsRoutingStrategy; this.nodeIndexCreatedAction = nodeIndexCreatedAction; this.nodeIndexDeletedAction = nodeIndexDeletedAction; + this.nodeMappingCreatedAction = nodeMappingCreatedAction; } public synchronized boolean createIndex(final String index, final Settings indexSettings, TimeValue timeout) throws IndexAlreadyExistsException { @@ -193,7 +200,7 @@ public class MetaDataService extends AbstractComponent { } } - public void addMapping(final String[] indices, String mappingType, final String mappingSource) throws ElasticSearchException { + public boolean addMapping(final String[] indices, String mappingType, final String mappingSource, TimeValue timeout) throws ElasticSearchException { ClusterState clusterState = clusterService.state(); for (String index : indices) { IndexRoutingTable indexTable = clusterState.routingTable().indicesRouting().get(index); @@ -224,6 +231,18 @@ public class MetaDataService extends AbstractComponent { logger.info("Indices [" + Arrays.toString(indices) + "]: Creating mapping [" + mappingType + "] with source [" + mappingSource + "]"); + final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length); + final Set indicesSet = Sets.newHashSet(indices); + final String fMappingType = mappingType; + NodeMappingCreatedAction.Listener listener = new NodeMappingCreatedAction.Listener() { + @Override public void onNodeMappingCreated(NodeMappingCreatedAction.NodeMappingCreatedResponse response) { + if (indicesSet.contains(response.index()) && response.type().equals(fMappingType)) { + latch.countDown(); + } + } + }; + nodeMappingCreatedAction.add(listener); + final String mappingTypeP = mappingType; clusterService.submitStateUpdateTask("create-mapping [" + mappingTypeP + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -238,6 +257,14 @@ public class MetaDataService extends AbstractComponent { return newClusterStateBuilder().state(currentState).metaData(builder).build(); } }); + + try { + return latch.await(timeout.millis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return false; + } finally { + nodeMappingCreatedAction.remove(listener); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9d1df932ceb..c1a19ab23d6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.action.index.NodeIndexCreatedAction; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; +import org.elasticsearch.cluster.action.index.NodeMappingCreatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -78,9 +79,12 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu private final NodeIndexDeletedAction nodeIndexDeletedAction; + private final NodeMappingCreatedAction nodeMappingCreatedAction; + @Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool, ShardStateAction shardStateAction, - NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction) { + NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction, + NodeMappingCreatedAction nodeMappingCreatedAction) { super(settings); this.indicesService = indicesService; this.clusterService = clusterService; @@ -88,6 +92,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu this.shardStateAction = shardStateAction; this.nodeIndexCreatedAction = nodeIndexCreatedAction; this.nodeIndexDeletedAction = nodeIndexDeletedAction; + this.nodeMappingCreatedAction = nodeMappingCreatedAction; } @Override public Lifecycle.State lifecycleState() { @@ -167,6 +172,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu logger.debug("Index [" + index + "] Adding mapping [" + mappingType + "], source [" + mappingSource + "]"); } mapperService.add(mappingType, mappingSource); + nodeMappingCreatedAction.nodeMappingCreated(new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId())); } else { DocumentMapper existingMapper = mapperService.documentMapper(mappingType); if (!mappingSource.equals(existingMapper.mappingSource())) { @@ -175,6 +181,7 @@ public class IndicesClusterStateService extends AbstractComponent implements Clu logger.debug("Index [" + index + "] Updating mapping [" + mappingType + "], source [" + mappingSource + "]"); } mapperService.add(mappingType, mappingSource); + nodeMappingCreatedAction.nodeMappingCreated(new NodeMappingCreatedAction.NodeMappingCreatedResponse(index, mappingType, event.state().nodes().localNodeId())); } } } catch (Exception e) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/create/RestCreateMappingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/create/RestCreateMappingAction.java index f6e076e7324..67ac604960c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/create/RestCreateMappingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/create/RestCreateMappingAction.java @@ -27,16 +27,19 @@ import org.elasticsearch.client.Client; import org.elasticsearch.index.mapper.InvalidTypeNameException; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.rest.*; -import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestJsonBuilder; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; import java.io.IOException; import static org.elasticsearch.ExceptionsHelper.*; +import static org.elasticsearch.client.Requests.*; import static org.elasticsearch.rest.RestRequest.Method.*; import static org.elasticsearch.rest.RestResponse.Status.*; +import static org.elasticsearch.rest.action.support.RestActions.*; +import static org.elasticsearch.util.TimeValue.*; /** * @author kimchy (Shay Banon) @@ -50,10 +53,11 @@ public class RestCreateMappingAction extends BaseRestHandler { } @Override public void handleRequest(final RestRequest request, final RestChannel channel) { - String[] indices = RestActions.splitIndices(request.param("index")); - String mappingType = request.param("type"); - String mappingSource = request.contentAsString(); - client.admin().indices().execCreateMapping(new CreateMappingRequest(indices, mappingType, mappingSource), new ActionListener() { + CreateMappingRequest createMappingRequest = createMappingRequest(splitIndices(request.param("index"))); + createMappingRequest.type(request.param("type")); + createMappingRequest.mappingSource(request.contentAsString()); + createMappingRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), timeValueSeconds(10))); + client.admin().indices().execCreateMapping(createMappingRequest, new ActionListener() { @Override public void onResponse(CreateMappingResponse result) { try { JsonBuilder builder = RestJsonBuilder.cached(request);