Change GetFieldMapping API to broadcast requests to nodes hosting the relevant indices.

This is due to the fact that have to have mappers in order to return the response.

Closes #5177
This commit is contained in:
Boaz Leskes 2014-02-20 22:40:42 +01:00 committed by Martijn van Groningen
parent ad8a482d19
commit 3e10810a8e
4 changed files with 405 additions and 34 deletions

View File

@ -19,21 +19,85 @@
package org.elasticsearch.action.admin.indices.mapping.get;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.info.ClusterInfoRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
/** Request the mappings of specific fields */
public class GetFieldMappingsRequest extends ClusterInfoRequest<GetFieldMappingsRequest> {
public class GetFieldMappingsRequest extends ActionRequest<GetFieldMappingsRequest> {
protected boolean local = false;
private String[] fields = Strings.EMPTY_ARRAY;
private boolean includeDefaults = false;
private String[] indices = Strings.EMPTY_ARRAY;
private String[] types = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.strict();
public GetFieldMappingsRequest() {
}
public GetFieldMappingsRequest(GetFieldMappingsRequest other) {
this.local = other.local;
this.includeDefaults = other.includeDefaults;
this.indices = other.indices;
this.types = other.types;
this.indicesOptions = other.indicesOptions;
this.fields = other.fields;
}
/**
* Indicate whether the receiving node should operate based on local index information or forward requests,
* where needed, to other nodes. If running locally, request will not raise errors if running locally & missing indices.
*/
public GetFieldMappingsRequest local(boolean local) {
this.local = local;
return this;
}
public boolean local() {
return local;
}
public GetFieldMappingsRequest indices(String... indices) {
this.indices = indices;
return this;
}
public GetFieldMappingsRequest types(String... types) {
this.types = types;
return this;
}
public GetFieldMappingsRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
public String[] indices() {
return indices;
}
public String[] types() {
return types;
}
public IndicesOptions indicesOptions() {
return indicesOptions;
}
/** @param fields a list of fields to retrieve the mapping for */
public GetFieldMappingsRequest fields(String... fields) {
this.fields = fields;
@ -62,6 +126,14 @@ public class GetFieldMappingsRequest extends ClusterInfoRequest<GetFieldMappings
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrBefore(Version.V_1_0_0)) {
// This request used to inherit from MasterNodeOperationRequest
MasterNodeOperationRequest.DEFAULT_MASTER_NODE_TIMEOUT.writeTo(out);
}
out.writeStringArray(indices);
out.writeStringArray(types);
indicesOptions.writeIndicesOptions(out);
out.writeBoolean(local);
out.writeStringArray(fields);
out.writeBoolean(includeDefaults);
}
@ -69,6 +141,14 @@ public class GetFieldMappingsRequest extends ClusterInfoRequest<GetFieldMappings
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrBefore(Version.V_1_0_0)) {
// This request used to inherit from MasterNodeOperationRequest
TimeValue.readTimeValue(in);
}
indices = in.readStringArray();
types = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
local = in.readBoolean();
fields = in.readStringArray();
includeDefaults = in.readBoolean();
}

View File

@ -19,18 +19,45 @@
package org.elasticsearch.action.admin.indices.mapping.get;
import com.google.common.collect.ObjectArrays;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.info.ClusterInfoRequestBuilder;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.internal.InternalGenericClient;
/** A helper class to build {@link GetFieldMappingsRequest} objects */
public class GetFieldMappingsRequestBuilder extends ClusterInfoRequestBuilder<GetFieldMappingsRequest, GetFieldMappingsResponse, GetFieldMappingsRequestBuilder> {
public class GetFieldMappingsRequestBuilder extends ActionRequestBuilder<GetFieldMappingsRequest, GetFieldMappingsResponse, GetFieldMappingsRequestBuilder> {
public GetFieldMappingsRequestBuilder(InternalGenericClient client, String... indices) {
super(client, new GetFieldMappingsRequest().indices(indices));
}
public GetFieldMappingsRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
public GetFieldMappingsRequestBuilder addIndices(String... indices) {
request.indices(ObjectArrays.concat(request.indices(), indices, String.class));
return this;
}
public GetFieldMappingsRequestBuilder setTypes(String... types) {
request.types(types);
return this;
}
public GetFieldMappingsRequestBuilder addTypes(String... types) {
request.types(ObjectArrays.concat(request.types(), types, String.class));
return this;
}
public GetFieldMappingsRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
request.indicesOptions(indicesOptions);
return this;
}
/** Sets the fields to retrieve. */
public GetFieldMappingsRequestBuilder setFields(String... fields) {

View File

@ -19,68 +19,282 @@
package org.elasticsearch.action.admin.indices.mapping.get;
import com.carrotsearch.hppc.ObjectIntOpenHashMap;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData;
import org.elasticsearch.action.support.master.info.TransportClusterInfoAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexShardMissingException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
public class TransportGetFieldMappingsAction extends TransportClusterInfoAction<GetFieldMappingsRequest, GetFieldMappingsResponse> {
public class TransportGetFieldMappingsAction extends TransportAction<GetFieldMappingsRequest, GetFieldMappingsResponse> {
protected final ClusterService clusterService;
private final TransportService transportService;
private final IndicesService indicesService;
protected AtomicInteger shardPicker = new AtomicInteger();
@Inject
public TransportGetFieldMappingsAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool) {
super(settings, transportService, clusterService, threadPool);
public TransportGetFieldMappingsAction(Settings settings, ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
ThreadPool threadPool) {
super(settings, threadPool);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.transportService = transportService;
transportService.registerHandler(GetFieldMappingsAction.NAME, new TransportHandler());
}
@Override
protected String transportAction() {
return GetFieldMappingsAction.NAME;
protected void doExecute(final GetFieldMappingsRequest request, ActionListener<GetFieldMappingsResponse> listener) {
ClusterState state = clusterService.state();
String[] concreteIndices = state.metaData().concreteIndices(request.indices(), request.indicesOptions());
request.indices(concreteIndices);
if (request.local) {
logger.trace("executing request locally");
listener.onResponse(new GetFieldMappingsResponse(findMappings(request.indices(), request.types(), request.fields(), request.includeDefaults())));
} else {
logger.trace("executing request with remote forwarding");
new AsyncAction(request, listener).start();
}
}
@Override
protected GetFieldMappingsRequest newRequest() {
return new GetFieldMappingsRequest();
protected class AsyncAction {
private final ClusterState state;
private final GetFieldMappingsRequest origRequest;
private final ActionListener<GetFieldMappingsResponse> listener;
private final ObjectIntOpenHashMap<String> mappingsIdPerIndex;
private final AtomicInteger pendingRequests;
private final AtomicArray<Throwable> indexErrors;
private final AtomicArray<ImmutableMap<String, ImmutableMap<String, FieldMappingMetaData>>> indexMappings;
private final ShardIterator[] shardsPerIndex;
AsyncAction(GetFieldMappingsRequest origRequest, ActionListener<GetFieldMappingsResponse> listener) {
this.origRequest = origRequest;
this.listener = listener;
this.state = clusterService.state();
String[] concreteIndices = state.metaData().concreteIndices(origRequest.indices(), origRequest.indicesOptions());
// normalize, will be used in the response construction.
origRequest.indices(concreteIndices);
mappingsIdPerIndex = new ObjectIntOpenHashMap<String>(concreteIndices.length);
pendingRequests = new AtomicInteger();
indexErrors = new AtomicArray<Throwable>(concreteIndices.length);
indexMappings = new AtomicArray<ImmutableMap<String, ImmutableMap<String, FieldMappingMetaData>>>(concreteIndices.length);
shardsPerIndex = new ShardIterator[concreteIndices.length];
// make sure we don't have hot shards
int shardSeed = shardPicker.getAndIncrement();
for (int id = 0; id < concreteIndices.length; id++) {
String index = concreteIndices[id];
mappingsIdPerIndex.put(index, id);
int shardNo = state.metaData().getIndices().get(index).getNumberOfShards();
for (int shard = shardNo - 1; shard >= 0; shard--) {
try {
shardsPerIndex[id] = clusterService.operationRouting().getShards(state, index, (shard + shardSeed) % shardNo, "_local");
break;
} catch (IndexShardMissingException e) {
if (shard == 0) {
// out of shards...
throw e;
}
}
}
}
}
public void start() {
sendNodeRequestsForIndices(origRequest.indices());
}
private void sendNodeRequestsForIndices(String[] indices) {
HashMap<String, List<String>> indicesPerNode = new HashMap<String, List<String>>();
for (int i = 0; i < indices.length; i++) {
String index = indices[i];
int id = mappingsIdPerIndex.get(index);
ShardRouting routing = shardsPerIndex[id].firstOrNull();
if (routing == null) {
assert false : "empty shard iterator for index [" + index + "]";
continue; // shouldn't really happen
}
List<String> indexList = indicesPerNode.get(routing.currentNodeId());
if (indexList == null) {
indexList = new ArrayList<String>();
indicesPerNode.put(routing.currentNodeId(), indexList);
}
indexList.add(index);
}
logger.trace("forwarding request to [{}] nodes", indicesPerNode.size());
pendingRequests.addAndGet(indicesPerNode.size());
DiscoveryNodes nodes = state.nodes();
for (String nodeId : indicesPerNode.keySet()) {
final GetFieldMappingsRequest nodeRequest = new GetFieldMappingsRequest(origRequest);
nodeRequest.local(true);
nodeRequest.indices(indicesPerNode.get(nodeId).toArray(Strings.EMPTY_ARRAY));
nodeRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, false));
if (nodes.localNodeId().equals(nodeId)) {
try {
handleNodeResponse(findMappings(nodeRequest.indices(), nodeRequest.types(), nodeRequest.fields(),
nodeRequest.includeDefaults()),
nodeRequest);
} catch (Throwable t) {
handleNodeException(t, nodeRequest);
} finally {
}
} else {
transportService.sendRequest(nodes.get(nodeId), GetFieldMappingsAction.NAME,
nodeRequest, new BaseTransportResponseHandler<GetFieldMappingsResponse>() {
@Override
public GetFieldMappingsResponse newInstance() {
return new GetFieldMappingsResponse();
}
@Override
public void handleResponse(GetFieldMappingsResponse nodeResponse) {
handleNodeResponse(nodeResponse.mappings(), nodeRequest);
}
@Override
public void handleException(TransportException exp) {
handleNodeException(exp, nodeRequest);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
);
}
}
}
protected void handleNodeException(Throwable exp, GetFieldMappingsRequest nodeRequest) {
try {
ArrayList<String> retryIndices = new ArrayList<String>();
for (String index : nodeRequest.indices()) {
int id = mappingsIdPerIndex.get(index);
if (shardsPerIndex[id].nextOrNull() == null) {
// nope.
indexErrors.set(id, exp);
// no point in trying, we will return an error
retryIndices.clear();
break;
} else {
retryIndices.add(index);
}
}
if (retryIndices.size() != 0) {
// resend requests for failed indices
sendNodeRequestsForIndices(retryIndices.toArray(Strings.EMPTY_ARRAY));
}
} finally {
if (pendingRequests.decrementAndGet() == 0) {
finnishHim();
}
}
}
private void handleNodeResponse(ImmutableMap<String, ImmutableMap<String, ImmutableMap<String, FieldMappingMetaData>>> mappings,
GetFieldMappingsRequest nodeRequest) {
try {
ArrayList<String> retryIndices = new ArrayList<String>();
for (String index : nodeRequest.indices()) {
ImmutableMap<String, ImmutableMap<String, FieldMappingMetaData>> indexMapping = mappings.get(index);
int id = mappingsIdPerIndex.get(index);
if (indexMapping == null) {
// advance the relevant iter, hopefully we have more.
if (shardsPerIndex[id].nextOrNull() == null) {
// nope.
indexErrors.set(id, new IndexShardMissingException(shardsPerIndex[id].shardId()));
// no point in trying, we will return an error
retryIndices.clear();
break;
} else {
retryIndices.add(index);
}
} else {
indexMappings.set(id, indexMapping);
}
}
if (retryIndices.size() != 0) {
// resend requests for failed indices
sendNodeRequestsForIndices(retryIndices.toArray(Strings.EMPTY_ARRAY));
}
} finally {
if (pendingRequests.decrementAndGet() == 0) {
finnishHim();
}
}
}
private void finnishHim() {
// for simplicity, just return an error if we had any
for (int i = 0; i < indexErrors.length(); i++) {
if (indexErrors.get(i) != null) {
listener.onFailure(indexErrors.get(i));
return;
}
}
ImmutableMap.Builder<String, ImmutableMap<String, ImmutableMap<String, FieldMappingMetaData>>> indexMapBuilder = ImmutableMap.builder();
for (int id = 0; id < origRequest.indices().length; id++) {
indexMapBuilder.put(origRequest.indices()[id], indexMappings.get(id));
}
GetFieldMappingsResponse response = new GetFieldMappingsResponse(indexMapBuilder.build());
listener.onResponse(response);
}
}
@Override
protected GetFieldMappingsResponse newResponse() {
return new GetFieldMappingsResponse();
}
@Override
protected void doMasterOperation(final GetFieldMappingsRequest request, final ClusterState state, final ActionListener<GetFieldMappingsResponse> listener) throws ElasticsearchException {
listener.onResponse(new GetFieldMappingsResponse(findMappings(request.indices(), request.types(), request.fields(), request.includeDefaults())));
}
private ImmutableMap<String, ImmutableMap<String, ImmutableMap<String, FieldMappingMetaData>>> findMappings(String[] concreteIndices,
final String[] types,
@ -94,9 +308,11 @@ public class TransportGetFieldMappingsAction extends TransportClusterInfoAction<
boolean isProbablySingleFieldRequest = concreteIndices.length == 1 && types.length == 1 && fields.length == 1;
ImmutableMap.Builder<String, ImmutableMap<String, ImmutableMap<String, FieldMappingMetaData>>> indexMapBuilder = ImmutableMap.builder();
Sets.SetView<String> intersection = Sets.intersection(Sets.newHashSet(concreteIndices), indicesService.indices());
for (String index : intersection) {
for (String index : concreteIndices) {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}
Collection<String> typeIntersection;
if (types.length == 0) {
typeIntersection = indexService.mapperService().types();
@ -121,9 +337,7 @@ public class TransportGetFieldMappingsAction extends TransportClusterInfoAction<
}
}
if (!typeMappings.isEmpty()) {
indexMapBuilder.put(index, typeMappings.immutableMap());
}
indexMapBuilder.put(index, typeMappings.immutableMap());
}
return indexMapBuilder.build();
@ -164,7 +378,8 @@ public class TransportGetFieldMappingsAction extends TransportClusterInfoAction<
return defaultValue;
}
@Override @Deprecated
@Override
@Deprecated
public Boolean paramAsBooleanOptional(String key, Boolean defaultValue) {
return paramAsBoolean(key, defaultValue);
}
@ -240,4 +455,43 @@ public class TransportGetFieldMappingsAction extends TransportClusterInfoAction<
}
private class TransportHandler extends BaseTransportRequestHandler<GetFieldMappingsRequest> {
@Override
public GetFieldMappingsRequest newInstance() {
return new GetFieldMappingsRequest();
}
@Override
public void messageReceived(GetFieldMappingsRequest request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<GetFieldMappingsResponse>() {
@Override
public void onResponse(GetFieldMappingsResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for get field mapping", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.indices.mapping;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
@ -41,8 +43,10 @@ public class SimpleGetFieldMappingsTests extends ElasticsearchIntegrationTest {
@Test
public void getMappingsWhereThereAreNone() {
createIndex("index");
ensureYellow();
GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings().get();
assertThat(response.mappings().size(), equalTo(0));
assertThat(response.mappings().size(), equalTo(1));
assertThat(response.mappings().get("index").size(), equalTo(0));
assertThat(response.fieldMappings("index", "type", "field"), Matchers.nullValue());
}
@ -57,16 +61,22 @@ public class SimpleGetFieldMappingsTests extends ElasticsearchIntegrationTest {
@Test
public void simpleGetFieldMappings() throws Exception {
Settings.Builder settings = ImmutableSettings.settingsBuilder()
.put("number_of_shards", randomIntBetween(1, 3), "number_of_replicas", randomIntBetween(0, 1));
assertTrue(client().admin().indices().prepareCreate("indexa")
.addMapping("typeA", getMappingForType("typeA"))
.addMapping("typeB", getMappingForType("typeB"))
.setSettings(settings)
.get().isAcknowledged());
assertTrue(client().admin().indices().prepareCreate("indexb")
.addMapping("typeA", getMappingForType("typeA"))
.addMapping("typeB", getMappingForType("typeB"))
.setSettings(settings)
.get().isAcknowledged());
ensureYellow();
// Get mappings by full name
GetFieldMappingsResponse response = client().admin().indices().prepareGetFieldMappings("indexa").setTypes("typeA").setFields("field1", "obj.subfield").get();
assertThat(response.fieldMappings("indexa", "typeA", "field1").fullName(), equalTo("field1"));