diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsRequest.java index 63cafa68043..fd0ce4f3298 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsRequest.java @@ -19,21 +19,85 @@ package org.elasticsearch.action.admin.indices.mapping.get; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.info.ClusterInfoRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; /** Request the mappings of specific fields */ -public class GetFieldMappingsRequest extends ClusterInfoRequest { +public class GetFieldMappingsRequest extends ActionRequest { + + protected boolean local = false; private String[] fields = Strings.EMPTY_ARRAY; private boolean includeDefaults = false; + private String[] indices = Strings.EMPTY_ARRAY; + private String[] types = Strings.EMPTY_ARRAY; + + private IndicesOptions indicesOptions = IndicesOptions.strict(); + + public GetFieldMappingsRequest() { + + } + + public GetFieldMappingsRequest(GetFieldMappingsRequest other) { + this.local = other.local; + this.includeDefaults = other.includeDefaults; + this.indices = other.indices; + this.types = other.types; + this.indicesOptions = other.indicesOptions; + this.fields = other.fields; + } + + /** + * Indicate whether the receiving node should operate based on local index information or forward requests, + * where needed, to other nodes. If running locally, request will not raise errors if running locally & missing indices. + */ + public GetFieldMappingsRequest local(boolean local) { + this.local = local; + return this; + } + + public boolean local() { + return local; + } + + public GetFieldMappingsRequest indices(String... indices) { + this.indices = indices; + return this; + } + + public GetFieldMappingsRequest types(String... types) { + this.types = types; + return this; + } + + public GetFieldMappingsRequest indicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = indicesOptions; + return this; + } + + public String[] indices() { + return indices; + } + + public String[] types() { + return types; + } + + public IndicesOptions indicesOptions() { + return indicesOptions; + } + /** @param fields a list of fields to retrieve the mapping for */ public GetFieldMappingsRequest fields(String... fields) { this.fields = fields; @@ -62,6 +126,14 @@ public class GetFieldMappingsRequest extends ClusterInfoRequest { +public class GetFieldMappingsRequestBuilder extends ActionRequestBuilder { public GetFieldMappingsRequestBuilder(InternalGenericClient client, String... indices) { super(client, new GetFieldMappingsRequest().indices(indices)); } + public GetFieldMappingsRequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + + public GetFieldMappingsRequestBuilder addIndices(String... indices) { + request.indices(ObjectArrays.concat(request.indices(), indices, String.class)); + return this; + } + + public GetFieldMappingsRequestBuilder setTypes(String... types) { + request.types(types); + return this; + } + + public GetFieldMappingsRequestBuilder addTypes(String... types) { + request.types(ObjectArrays.concat(request.types(), types, String.class)); + return this; + } + + public GetFieldMappingsRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) { + request.indicesOptions(indicesOptions); + return this; + } + /** Sets the fields to retrieve. */ public GetFieldMappingsRequestBuilder setFields(String... fields) { diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java index f52d5f50e9b..a22d344e525 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java @@ -19,68 +19,282 @@ package org.elasticsearch.action.admin.indices.mapping.get; +import com.carrotsearch.hppc.ObjectIntOpenHashMap; import com.google.common.base.Predicate; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData; -import org.elasticsearch.action.support.master.info.TransportClusterInfoAction; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** */ -public class TransportGetFieldMappingsAction extends TransportClusterInfoAction { +public class TransportGetFieldMappingsAction extends TransportAction { + + protected final ClusterService clusterService; + private final TransportService transportService; private final IndicesService indicesService; + protected AtomicInteger shardPicker = new AtomicInteger(); + @Inject - public TransportGetFieldMappingsAction(Settings settings, TransportService transportService, ClusterService clusterService, - IndicesService indicesService, ThreadPool threadPool) { - super(settings, transportService, clusterService, threadPool); + public TransportGetFieldMappingsAction(Settings settings, ClusterService clusterService, + TransportService transportService, + IndicesService indicesService, + ThreadPool threadPool) { + super(settings, threadPool); + this.clusterService = clusterService; this.indicesService = indicesService; + this.transportService = transportService; + transportService.registerHandler(GetFieldMappingsAction.NAME, new TransportHandler()); } @Override - protected String transportAction() { - return GetFieldMappingsAction.NAME; + protected void doExecute(final GetFieldMappingsRequest request, ActionListener listener) { + ClusterState state = clusterService.state(); + String[] concreteIndices = state.metaData().concreteIndices(request.indices(), request.indicesOptions()); + request.indices(concreteIndices); + if (request.local) { + logger.trace("executing request locally"); + listener.onResponse(new GetFieldMappingsResponse(findMappings(request.indices(), request.types(), request.fields(), request.includeDefaults()))); + } else { + logger.trace("executing request with remote forwarding"); + new AsyncAction(request, listener).start(); + } } - @Override - protected GetFieldMappingsRequest newRequest() { - return new GetFieldMappingsRequest(); + protected class AsyncAction { + + private final ClusterState state; + private final GetFieldMappingsRequest origRequest; + private final ActionListener listener; + private final ObjectIntOpenHashMap mappingsIdPerIndex; + private final AtomicInteger pendingRequests; + private final AtomicArray indexErrors; + private final AtomicArray>> indexMappings; + private final ShardIterator[] shardsPerIndex; + + AsyncAction(GetFieldMappingsRequest origRequest, ActionListener listener) { + + this.origRequest = origRequest; + this.listener = listener; + this.state = clusterService.state(); + String[] concreteIndices = state.metaData().concreteIndices(origRequest.indices(), origRequest.indicesOptions()); + // normalize, will be used in the response construction. + origRequest.indices(concreteIndices); + mappingsIdPerIndex = new ObjectIntOpenHashMap(concreteIndices.length); + pendingRequests = new AtomicInteger(); + indexErrors = new AtomicArray(concreteIndices.length); + indexMappings = new AtomicArray>>(concreteIndices.length); + + shardsPerIndex = new ShardIterator[concreteIndices.length]; + + // make sure we don't have hot shards + int shardSeed = shardPicker.getAndIncrement(); + for (int id = 0; id < concreteIndices.length; id++) { + String index = concreteIndices[id]; + mappingsIdPerIndex.put(index, id); + int shardNo = state.metaData().getIndices().get(index).getNumberOfShards(); + for (int shard = shardNo - 1; shard >= 0; shard--) { + try { + shardsPerIndex[id] = clusterService.operationRouting().getShards(state, index, (shard + shardSeed) % shardNo, "_local"); + break; + } catch (IndexShardMissingException e) { + if (shard == 0) { + // out of shards... + throw e; + } + } + } + + } + } + + public void start() { + sendNodeRequestsForIndices(origRequest.indices()); + } + + private void sendNodeRequestsForIndices(String[] indices) { + HashMap> indicesPerNode = new HashMap>(); + for (int i = 0; i < indices.length; i++) { + String index = indices[i]; + int id = mappingsIdPerIndex.get(index); + ShardRouting routing = shardsPerIndex[id].firstOrNull(); + if (routing == null) { + assert false : "empty shard iterator for index [" + index + "]"; + continue; // shouldn't really happen + } + List indexList = indicesPerNode.get(routing.currentNodeId()); + if (indexList == null) { + indexList = new ArrayList(); + indicesPerNode.put(routing.currentNodeId(), indexList); + } + indexList.add(index); + } + + logger.trace("forwarding request to [{}] nodes", indicesPerNode.size()); + pendingRequests.addAndGet(indicesPerNode.size()); + DiscoveryNodes nodes = state.nodes(); + for (String nodeId : indicesPerNode.keySet()) { + final GetFieldMappingsRequest nodeRequest = new GetFieldMappingsRequest(origRequest); + nodeRequest.local(true); + nodeRequest.indices(indicesPerNode.get(nodeId).toArray(Strings.EMPTY_ARRAY)); + nodeRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, false)); + if (nodes.localNodeId().equals(nodeId)) { + try { + handleNodeResponse(findMappings(nodeRequest.indices(), nodeRequest.types(), nodeRequest.fields(), + nodeRequest.includeDefaults()), + nodeRequest); + } catch (Throwable t) { + handleNodeException(t, nodeRequest); + } finally { + + } + } else { + transportService.sendRequest(nodes.get(nodeId), GetFieldMappingsAction.NAME, + nodeRequest, new BaseTransportResponseHandler() { + @Override + public GetFieldMappingsResponse newInstance() { + return new GetFieldMappingsResponse(); + } + + @Override + public void handleResponse(GetFieldMappingsResponse nodeResponse) { + handleNodeResponse(nodeResponse.mappings(), nodeRequest); + } + + @Override + public void handleException(TransportException exp) { + handleNodeException(exp, nodeRequest); + } + + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + ); + } + } + + } + + protected void handleNodeException(Throwable exp, GetFieldMappingsRequest nodeRequest) { + try { + ArrayList retryIndices = new ArrayList(); + for (String index : nodeRequest.indices()) { + int id = mappingsIdPerIndex.get(index); + if (shardsPerIndex[id].nextOrNull() == null) { + // nope. + indexErrors.set(id, exp); + // no point in trying, we will return an error + retryIndices.clear(); + break; + } else { + retryIndices.add(index); + } + + } + if (retryIndices.size() != 0) { + // resend requests for failed indices + sendNodeRequestsForIndices(retryIndices.toArray(Strings.EMPTY_ARRAY)); + } + + } finally { + if (pendingRequests.decrementAndGet() == 0) { + finnishHim(); + } + } + } + + private void handleNodeResponse(ImmutableMap>> mappings, + GetFieldMappingsRequest nodeRequest) { + try { + ArrayList retryIndices = new ArrayList(); + for (String index : nodeRequest.indices()) { + ImmutableMap> indexMapping = mappings.get(index); + int id = mappingsIdPerIndex.get(index); + if (indexMapping == null) { + // advance the relevant iter, hopefully we have more. + if (shardsPerIndex[id].nextOrNull() == null) { + // nope. + indexErrors.set(id, new IndexShardMissingException(shardsPerIndex[id].shardId())); + // no point in trying, we will return an error + retryIndices.clear(); + break; + } else { + retryIndices.add(index); + } + } else { + indexMappings.set(id, indexMapping); + } + } + if (retryIndices.size() != 0) { + // resend requests for failed indices + sendNodeRequestsForIndices(retryIndices.toArray(Strings.EMPTY_ARRAY)); + } + + } finally { + if (pendingRequests.decrementAndGet() == 0) { + finnishHim(); + } + } + + } + + private void finnishHim() { + // for simplicity, just return an error if we had any + for (int i = 0; i < indexErrors.length(); i++) { + if (indexErrors.get(i) != null) { + listener.onFailure(indexErrors.get(i)); + return; + } + } + + ImmutableMap.Builder>> indexMapBuilder = ImmutableMap.builder(); + for (int id = 0; id < origRequest.indices().length; id++) { + indexMapBuilder.put(origRequest.indices()[id], indexMappings.get(id)); + } + GetFieldMappingsResponse response = new GetFieldMappingsResponse(indexMapBuilder.build()); + listener.onResponse(response); + } + } - @Override - protected GetFieldMappingsResponse newResponse() { - return new GetFieldMappingsResponse(); - } - - @Override - protected void doMasterOperation(final GetFieldMappingsRequest request, final ClusterState state, final ActionListener listener) throws ElasticsearchException { - - listener.onResponse(new GetFieldMappingsResponse(findMappings(request.indices(), request.types(), request.fields(), request.includeDefaults()))); - } private ImmutableMap>> findMappings(String[] concreteIndices, final String[] types, @@ -94,9 +308,11 @@ public class TransportGetFieldMappingsAction extends TransportClusterInfoAction< boolean isProbablySingleFieldRequest = concreteIndices.length == 1 && types.length == 1 && fields.length == 1; ImmutableMap.Builder>> indexMapBuilder = ImmutableMap.builder(); - Sets.SetView intersection = Sets.intersection(Sets.newHashSet(concreteIndices), indicesService.indices()); - for (String index : intersection) { + for (String index : concreteIndices) { IndexService indexService = indicesService.indexService(index); + if (indexService == null) { + continue; + } Collection typeIntersection; if (types.length == 0) { typeIntersection = indexService.mapperService().types(); @@ -121,9 +337,7 @@ public class TransportGetFieldMappingsAction extends TransportClusterInfoAction< } } - if (!typeMappings.isEmpty()) { - indexMapBuilder.put(index, typeMappings.immutableMap()); - } + indexMapBuilder.put(index, typeMappings.immutableMap()); } return indexMapBuilder.build(); @@ -164,7 +378,8 @@ public class TransportGetFieldMappingsAction extends TransportClusterInfoAction< return defaultValue; } - @Override @Deprecated + @Override + @Deprecated public Boolean paramAsBooleanOptional(String key, Boolean defaultValue) { return paramAsBoolean(key, defaultValue); } @@ -240,4 +455,43 @@ public class TransportGetFieldMappingsAction extends TransportClusterInfoAction< } + private class TransportHandler extends BaseTransportRequestHandler { + + @Override + public GetFieldMappingsRequest newInstance() { + return new GetFieldMappingsRequest(); + } + + @Override + public void messageReceived(GetFieldMappingsRequest request, final TransportChannel channel) throws Exception { + // no need to use threaded listener, since we just send a response + request.listenerThreaded(false); + + execute(request, new ActionListener() { + @Override + public void onResponse(GetFieldMappingsResponse result) { + try { + channel.sendResponse(result); + } catch (Throwable e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn("Failed to send response for get field mapping", e1); + } + } + }); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/indices/mapping/SimpleGetFieldMappingsTests.java b/src/test/java/org/elasticsearch/indices/mapping/SimpleGetFieldMappingsTests.java index e0cc9d12ab8..8b25ee95888 100644 --- a/src/test/java/org/elasticsearch/indices/mapping/SimpleGetFieldMappingsTests.java +++ b/src/test/java/org/elasticsearch/indices/mapping/SimpleGetFieldMappingsTests.java @@ -22,6 +22,8 @@ package org.elasticsearch.indices.mapping; import com.google.common.base.Predicate; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.hamcrest.Matchers; @@ -41,8 +43,10 @@ public class SimpleGetFieldMappingsTests extends ElasticsearchIntegrationTest { @Test public void getMappingsWhereThereAreNone() { createIndex("index"); + ensureYellow(); GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings().get(); - assertThat(response.mappings().size(), equalTo(0)); + assertThat(response.mappings().size(), equalTo(1)); + assertThat(response.mappings().get("index").size(), equalTo(0)); assertThat(response.fieldMappings("index", "type", "field"), Matchers.nullValue()); } @@ -57,16 +61,22 @@ public class SimpleGetFieldMappingsTests extends ElasticsearchIntegrationTest { @Test public void simpleGetFieldMappings() throws Exception { - + Settings.Builder settings = ImmutableSettings.settingsBuilder() + .put("number_of_shards", randomIntBetween(1, 3), "number_of_replicas", randomIntBetween(0, 1)); + assertTrue(client().admin().indices().prepareCreate("indexa") .addMapping("typeA", getMappingForType("typeA")) .addMapping("typeB", getMappingForType("typeB")) + .setSettings(settings) .get().isAcknowledged()); assertTrue(client().admin().indices().prepareCreate("indexb") .addMapping("typeA", getMappingForType("typeA")) .addMapping("typeB", getMappingForType("typeB")) + .setSettings(settings) .get().isAcknowledged()); + ensureYellow(); + // Get mappings by full name GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings("indexa").setTypes("typeA").setFields("field1", "obj.subfield").get(); assertThat(response.fieldMappings("indexa", "typeA", "field1").fullName(), equalTo("field1"));