For large responses to the get mappings request, the serialization to XContent can be extremely slow (serializing mappings is expensive since we have to decompress and deserialize the mapping source). To not introduce instability on the IO thread handling the get mappings response we should move the serialization to the management pool. The trade-off of introducing one or two new context switches for responses that are small enough to not cause trouble on the transport thread to prevent instability in case of a large number of mappings in the cluster seems worth it.
This commit is contained in:
parent
22509c95f8
commit
e09058df1a
|
@ -422,6 +422,7 @@ public class ActionModule extends AbstractModule {
|
|||
private final RestController restController;
|
||||
private final RequestValidators<PutMappingRequest> mappingRequestValidators;
|
||||
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
|
||||
|
@ -434,6 +435,7 @@ public class ActionModule extends AbstractModule {
|
|||
this.clusterSettings = clusterSettings;
|
||||
this.settingsFilter = settingsFilter;
|
||||
this.actionPlugins = actionPlugins;
|
||||
this.threadPool = threadPool;
|
||||
actions = setupActions(actionPlugins);
|
||||
actionFilters = setupActionFilters(actionPlugins);
|
||||
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
|
||||
|
@ -698,7 +700,7 @@ public class ActionModule extends AbstractModule {
|
|||
registerHandler.accept(new RestSimulateTemplateAction());
|
||||
|
||||
registerHandler.accept(new RestPutMappingAction());
|
||||
registerHandler.accept(new RestGetMappingAction());
|
||||
registerHandler.accept(new RestGetMappingAction(threadPool));
|
||||
registerHandler.accept(new RestGetFieldMappingAction());
|
||||
|
||||
registerHandler.accept(new RestRefreshAction());
|
||||
|
|
|
@ -23,6 +23,8 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
|
@ -33,6 +35,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
|
|||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.indices.TypeMissingException;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
|
@ -40,7 +43,9 @@ import org.elasticsearch.rest.BytesRestResponse;
|
|||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.RestActionListener;
|
||||
import org.elasticsearch.rest.action.RestBuilderListener;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -63,6 +68,12 @@ public class RestGetMappingAction extends BaseRestHandler {
|
|||
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Using include_type_name in get" +
|
||||
" mapping requests is deprecated. The parameter will be removed in the next major version.";
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
public RestGetMappingAction(ThreadPool threadPool) {
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Route> routes() {
|
||||
return unmodifiableList(asList(
|
||||
|
@ -90,10 +101,10 @@ public class RestGetMappingAction extends BaseRestHandler {
|
|||
|
||||
if (request.method().equals(HEAD)) {
|
||||
deprecationLogger.deprecatedAndMaybeLog("get_mapping_types_removal",
|
||||
"Type exists requests are deprecated, as types have been deprecated.");
|
||||
"Type exists requests are deprecated, as types have been deprecated.");
|
||||
} else if (includeTypeName == false && types.length > 0) {
|
||||
throw new IllegalArgumentException("Types cannot be provided in get mapping requests, unless" +
|
||||
" include_type_name is set to true.");
|
||||
" include_type_name is set to true.");
|
||||
}
|
||||
if (request.hasParam(INCLUDE_TYPE_NAME_PARAMETER)) {
|
||||
deprecationLogger.deprecatedAndMaybeLog("get_mapping_with_types", TYPES_DEPRECATION_MESSAGE);
|
||||
|
@ -102,58 +113,75 @@ public class RestGetMappingAction extends BaseRestHandler {
|
|||
final GetMappingsRequest getMappingsRequest = new GetMappingsRequest();
|
||||
getMappingsRequest.indices(indices).types(types);
|
||||
getMappingsRequest.indicesOptions(IndicesOptions.fromRequest(request, getMappingsRequest.indicesOptions()));
|
||||
getMappingsRequest.masterNodeTimeout(request.paramAsTime("master_timeout", getMappingsRequest.masterNodeTimeout()));
|
||||
final TimeValue timeout = request.paramAsTime("master_timeout", getMappingsRequest.masterNodeTimeout());
|
||||
getMappingsRequest.masterNodeTimeout(timeout);
|
||||
getMappingsRequest.local(request.paramAsBoolean("local", getMappingsRequest.local()));
|
||||
return channel -> client.admin().indices().getMappings(getMappingsRequest, new RestBuilderListener<GetMappingsResponse>(channel) {
|
||||
return channel -> client.admin().indices().getMappings(getMappingsRequest, new RestActionListener<GetMappingsResponse>(channel) {
|
||||
|
||||
@Override
|
||||
public RestResponse buildResponse(final GetMappingsResponse response, final XContentBuilder builder) throws Exception {
|
||||
final ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetadata>> mappingsByIndex = response.getMappings();
|
||||
if (mappingsByIndex.isEmpty() && types.length != 0) {
|
||||
builder.close();
|
||||
return new BytesRestResponse(channel, new TypeMissingException("_all", String.join(",", types)));
|
||||
}
|
||||
protected void processResponse(GetMappingsResponse getMappingsResponse) {
|
||||
final long startTimeMs = threadPool.relativeTimeInMillis();
|
||||
// Process serialization on GENERIC pool since the serialization of the raw mappings to XContent can be too slow to execute
|
||||
// on an IO thread
|
||||
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(
|
||||
ActionRunnable.wrap(this, l -> new RestBuilderListener<GetMappingsResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(final GetMappingsResponse response,
|
||||
final XContentBuilder builder) throws Exception {
|
||||
if (threadPool.relativeTimeInMillis() - startTimeMs > timeout.millis()) {
|
||||
throw new ElasticsearchTimeoutException("Timed out getting mappings");
|
||||
}
|
||||
final ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetadata>> mappingsByIndex =
|
||||
response.getMappings();
|
||||
if (mappingsByIndex.isEmpty() && types.length != 0) {
|
||||
builder.close();
|
||||
return new BytesRestResponse(channel, new TypeMissingException("_all", String.join(",", types)));
|
||||
}
|
||||
|
||||
final Set<String> typeNames = new HashSet<>();
|
||||
for (final ObjectCursor<ImmutableOpenMap<String, MappingMetadata>> cursor : mappingsByIndex.values()) {
|
||||
for (final ObjectCursor<String> inner : cursor.value.keys()) {
|
||||
typeNames.add(inner.value);
|
||||
}
|
||||
}
|
||||
final Set<String> typeNames = new HashSet<>();
|
||||
for (final ObjectCursor<ImmutableOpenMap<String, MappingMetadata>> cursor : mappingsByIndex.values()) {
|
||||
for (final ObjectCursor<String> inner : cursor.value.keys()) {
|
||||
typeNames.add(inner.value);
|
||||
}
|
||||
}
|
||||
|
||||
final SortedSet<String> difference = Sets.sortedDifference(Arrays.stream(types).collect(Collectors.toSet()), typeNames);
|
||||
final SortedSet<String> difference =
|
||||
Sets.sortedDifference(Arrays.stream(types).collect(Collectors.toSet()), typeNames);
|
||||
|
||||
// now remove requested aliases that contain wildcards that are simple matches
|
||||
final List<String> matches = new ArrayList<>();
|
||||
outer:
|
||||
for (final String pattern : difference) {
|
||||
if (pattern.contains("*")) {
|
||||
for (final String typeName : typeNames) {
|
||||
if (Regex.simpleMatch(pattern, typeName)) {
|
||||
matches.add(pattern);
|
||||
continue outer;
|
||||
// now remove requested aliases that contain wildcards that are simple matches
|
||||
final List<String> matches = new ArrayList<>();
|
||||
outer:
|
||||
for (final String pattern : difference) {
|
||||
if (pattern.contains("*")) {
|
||||
for (final String typeName : typeNames) {
|
||||
if (Regex.simpleMatch(pattern, typeName)) {
|
||||
matches.add(pattern);
|
||||
continue outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
difference.removeAll(matches);
|
||||
|
||||
final RestStatus status;
|
||||
builder.startObject();
|
||||
{
|
||||
if (difference.isEmpty()) {
|
||||
status = RestStatus.OK;
|
||||
} else {
|
||||
status = RestStatus.NOT_FOUND;
|
||||
final String message = String.format(Locale.ROOT, "type" + (difference.size() == 1 ? "" : "s") +
|
||||
" [%s] missing", Strings.collectionToCommaDelimitedString(difference));
|
||||
builder.field("error", message);
|
||||
builder.field("status", status.getStatus());
|
||||
}
|
||||
response.toXContent(builder, request);
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
return new BytesRestResponse(status, builder);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
difference.removeAll(matches);
|
||||
|
||||
final RestStatus status;
|
||||
builder.startObject();
|
||||
{
|
||||
if (difference.isEmpty()) {
|
||||
status = RestStatus.OK;
|
||||
} else {
|
||||
status = RestStatus.NOT_FOUND;
|
||||
final String message = String.format(Locale.ROOT, "type" + (difference.size() == 1 ? "" : "s") +
|
||||
" [%s] missing", Strings.collectionToCommaDelimitedString(difference));
|
||||
builder.field("error", message);
|
||||
builder.field("status", status.getStatus());
|
||||
}
|
||||
response.toXContent(builder, request);
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
return new BytesRestResponse(status, builder);
|
||||
}.onResponse(getMappingsResponse)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -27,6 +27,9 @@ import org.elasticsearch.rest.RestStatus;
|
|||
import org.elasticsearch.test.rest.FakeRestChannel;
|
||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||
import org.elasticsearch.test.rest.RestActionTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
@ -37,9 +40,17 @@ import static org.mockito.Mockito.mock;
|
|||
|
||||
public class RestGetMappingActionTests extends RestActionTestCase {
|
||||
|
||||
private ThreadPool threadPool;
|
||||
|
||||
@Before
|
||||
public void setUpAction() {
|
||||
controller().registerHandler(new RestGetMappingAction());
|
||||
threadPool = new TestThreadPool(RestValidateQueryActionTests.class.getName());
|
||||
controller().registerHandler(new RestGetMappingAction(threadPool));
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownAction() {
|
||||
assertTrue(terminate(threadPool));
|
||||
}
|
||||
|
||||
public void testTypeExistsDeprecation() throws Exception {
|
||||
|
@ -50,7 +61,7 @@ public class RestGetMappingActionTests extends RestActionTestCase {
|
|||
.withParams(params)
|
||||
.build();
|
||||
|
||||
RestGetMappingAction handler = new RestGetMappingAction();
|
||||
RestGetMappingAction handler = new RestGetMappingAction(threadPool);
|
||||
handler.prepareRequest(request, mock(NodeClient.class));
|
||||
|
||||
assertWarnings("Type exists requests are deprecated, as types have been deprecated.");
|
||||
|
|
Loading…
Reference in New Issue