Introduce new put mapping action for dynamic mapping updates. (#58746)

Backport of #58419

Mapping updates that originate from indexing a document with unmapped fields will use this new action
instead of the current put mapping action. This way on the security side, authorization logic
can easily determine whether a mapping update is automatically generated or a mapping update originates
from the put mapping api.

The new auto put mapping action is only used if all nodes are on the version that supports it.
This commit is contained in:
Martijn van Groningen 2020-06-30 18:02:31 +02:00 committed by GitHub
parent 8c93f4e154
commit adcef93a6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 267 additions and 42 deletions

View File

@ -137,8 +137,10 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.TransportGetFieldMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.TransportGetFieldMappingsIndexAction;
import org.elasticsearch.action.admin.indices.mapping.get.TransportGetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.put.AutoPutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.TransportAutoPutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
import org.elasticsearch.action.admin.indices.open.TransportOpenIndexAction;
@ -557,6 +559,7 @@ public class ActionModule extends AbstractModule {
actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class,
TransportGetFieldMappingsIndexAction.class);
actions.register(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
actions.register(AutoPutMappingAction.INSTANCE, TransportAutoPutMappingAction.class);
actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.mapping.put;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
public class AutoPutMappingAction extends ActionType<AcknowledgedResponse> {
public static final AutoPutMappingAction INSTANCE = new AutoPutMappingAction();
public static final String NAME = "indices:admin/mapping/auto_put";
private AutoPutMappingAction() {
super(NAME, AcknowledgedResponse::new);
}
}

View File

@ -139,7 +139,7 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
* Sets a concrete index for this put mapping request.
*/
public PutMappingRequest setConcreteIndex(Index index) {
Objects.requireNonNull(indices, "index must not be null");
Objects.requireNonNull(index, "index must not be null");
this.concreteIndex = index;
return this;
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.mapping.put;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataMappingService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.Index;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import static org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction.performMappingUpdate;
public class TransportAutoPutMappingAction extends TransportMasterNodeAction<PutMappingRequest, AcknowledgedResponse> {
private final MetadataMappingService metadataMappingService;
@Inject
public TransportAutoPutMappingAction(
final TransportService transportService,
final ClusterService clusterService,
final ThreadPool threadPool,
final MetadataMappingService metadataMappingService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver) {
super(AutoPutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters,
PutMappingRequest::new, indexNameExpressionResolver);
this.metadataMappingService = metadataMappingService;
}
@Override
protected String executor() {
// we go async right away
return ThreadPool.Names.SAME;
}
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
return new AcknowledgedResponse(in);
}
@Override
protected void doExecute(Task task, PutMappingRequest request, ActionListener<AcknowledgedResponse> listener) {
if (request.getConcreteIndex() == null) {
throw new IllegalArgumentException("concrete index missing");
}
super.doExecute(task, request, listener);
}
@Override
protected ClusterBlockException checkBlock(PutMappingRequest request, ClusterState state) {
String[] indices = new String[] {request.getConcreteIndex().getName()};
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indices);
}
@Override
protected void masterOperation(final PutMappingRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
final Index[] concreteIndices = new Index[] {request.getConcreteIndex()};
performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
}
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
@ -104,6 +105,18 @@ public class TransportPutMappingAction extends TransportMasterNodeAction<PutMapp
listener.onFailure(maybeValidationException.get());
return;
}
performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
} catch (IndexNotFoundException ex) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}], type [{}]",
request.indices(), request.type()), ex);
throw ex;
}
}
static void performMappingUpdate(Index[] concreteIndices,
PutMappingRequest request,
ActionListener<AcknowledgedResponse> listener,
MetadataMappingService metadataMappingService) {
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices).type(request.type())
@ -118,16 +131,11 @@ public class TransportPutMappingAction extends TransportMasterNodeAction<PutMapp
@Override
public void onFailure(Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}], type [{}]",
concreteIndices, request.type()), t);
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]",
Arrays.asList(concreteIndices)), t);
listener.onFailure(t);
}
});
} catch (IndexNotFoundException ex) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}], type [{}]",
request.indices(), request.type()), ex);
throw ex;
}
}
}

View File

@ -20,11 +20,14 @@
package org.elasticsearch.cluster.action.index;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.admin.indices.mapping.put.AutoPutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
@ -57,11 +60,13 @@ public class MappingUpdatedAction {
private IndicesAdminClient client;
private volatile TimeValue dynamicMappingUpdateTimeout;
private final AdjustableSemaphore semaphore;
private final ClusterService clusterService;
@Inject
public MappingUpdatedAction(Settings settings, ClusterSettings clusterSettings) {
public MappingUpdatedAction(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService) {
this.dynamicMappingUpdateTimeout = INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING.get(settings);
this.semaphore = new AdjustableSemaphore(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.get(settings), true);
this.clusterService = clusterService;
clusterSettings.addSettingsUpdateConsumer(INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, this::setDynamicMappingUpdateTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, this::setMaxInFlightUpdates);
}
@ -115,19 +120,19 @@ public class MappingUpdatedAction {
// can be overridden by tests
protected void sendUpdateMapping(Index index, String type, Mapping mappingUpdate, ActionListener<Void> listener) {
client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
.setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO)
.execute(new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
listener.onResponse(null);
PutMappingRequest putMappingRequest = new PutMappingRequest();
putMappingRequest.setConcreteIndex(index);
putMappingRequest.type(type);
putMappingRequest.source(mappingUpdate.toString(), XContentType.JSON);
putMappingRequest.masterNodeTimeout(dynamicMappingUpdateTimeout);
putMappingRequest.timeout(TimeValue.ZERO);
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_7_9_0)) {
client.execute(AutoPutMappingAction.INSTANCE, putMappingRequest,
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
} else {
client.putMapping(putMappingRequest,
ActionListener.wrap(r -> listener.onResponse(null), e -> listener.onFailure(unwrapException(e))));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(unwrapException(e));
}
});
}
// todo: this explicit unwrap should not be necessary, but is until guessRootCause is fixed to allow wrapped non-es exception.

View File

@ -18,18 +18,40 @@
*/
package org.elasticsearch.cluster.action.index;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.AutoPutMappingAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction.AdjustableSemaphore;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Map;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.test.ESTestCase;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class MappingUpdatedActionTests extends ESTestCase {
public void testAdjustableSemaphore() {
@ -83,7 +105,7 @@ public class MappingUpdatedActionTests extends ESTestCase {
List<ActionListener<Void>> inFlightListeners = new CopyOnWriteArrayList<>();
final MappingUpdatedAction mua = new MappingUpdatedAction(Settings.builder()
.put(MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING.getKey(), 1).build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
@Override
protected void sendUpdateMapping(Index index, String type, Mapping mappingUpdate, ActionListener<Void> listener) {
@ -115,4 +137,58 @@ public class MappingUpdatedActionTests extends ESTestCase {
inFlightListeners.remove(0).onResponse(null);
assertTrue(fut2.isDone());
}
public void testSendUpdateMappingUsingPutMappingAction() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("first", buildNewFakeTransportAddress(), Version.V_7_8_0))
.build();
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).nodes(nodes).build();
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(clusterState);
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);
MappingUpdatedAction mua = new MappingUpdatedAction(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), clusterService);
mua.setClient(client);
Settings indexSettings = Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT).build();
final Mapper.BuilderContext context = new Mapper.BuilderContext(indexSettings, new ContentPath());
RootObjectMapper rootObjectMapper = new RootObjectMapper.Builder("name").build(context);
Mapping update = new Mapping(Version.V_7_8_0, rootObjectMapper, new MetadataFieldMapper[0], Map.of());
mua.sendUpdateMapping(new Index("name", "uuid"), "type", update, ActionListener.wrap(() -> {}));
verify(indicesAdminClient).putMapping(any(), any());
}
public void testSendUpdateMappingUsingAutoPutMappingAction() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("first", buildNewFakeTransportAddress(), Version.V_7_9_0))
.build();
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).nodes(nodes).build();
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(clusterState);
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);
MappingUpdatedAction mua = new MappingUpdatedAction(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), clusterService);
mua.setClient(client);
Settings indexSettings = Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT).build();
final Mapper.BuilderContext context = new Mapper.BuilderContext(indexSettings, new ContentPath());
RootObjectMapper rootObjectMapper = new RootObjectMapper.Builder("name").build(context);
Mapping update = new Mapping(Version.V_7_9_0, rootObjectMapper, new MetadataFieldMapper[0], Map.of());
mua.sendUpdateMapping(new Index("name", "uuid"), "type", update, ActionListener.wrap(() -> {}));
verify(indicesAdminClient).execute(eq(AutoPutMappingAction.INSTANCE), any(), any());
}
}

View File

@ -58,7 +58,9 @@ import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.AutoPutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.TransportAutoPutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
@ -1501,7 +1503,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
metadataCreateIndexService,
actionFilters, indexNameExpressionResolver
));
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings, clusterService);
mappingUpdatedAction.setClient(client);
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
@ -1528,6 +1530,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
actions.put(PutMappingAction.INSTANCE,
new TransportPutMappingAction(transportService, clusterService, threadPool, metadataMappingService,
actionFilters, indexNameExpressionResolver, new RequestValidators<>(Collections.emptyList())));
actions.put(AutoPutMappingAction.INSTANCE,
new TransportAutoPutMappingAction(transportService, clusterService, threadPool, metadataMappingService,
actionFilters, indexNameExpressionResolver));
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));

View File

@ -20,6 +20,7 @@ import org.elasticsearch.action.admin.indices.exists.types.TypesExistsAction;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
import org.elasticsearch.action.admin.indices.mapping.put.AutoPutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
@ -51,13 +52,14 @@ public final class IndexPrivilege extends Privilege {
private static final Automaton READ_CROSS_CLUSTER_AUTOMATON = patterns("internal:transport/proxy/indices:data/read/*",
ClusterSearchShardsAction.NAME);
private static final Automaton CREATE_AUTOMATON = patterns("indices:data/write/index*", "indices:data/write/bulk*",
PutMappingAction.NAME);
PutMappingAction.NAME, AutoPutMappingAction.NAME);
private static final Automaton CREATE_DOC_AUTOMATON = patterns("indices:data/write/index", "indices:data/write/index[*",
"indices:data/write/index:op_type/create", "indices:data/write/bulk*", PutMappingAction.NAME);
private static final Automaton INDEX_AUTOMATON =
patterns("indices:data/write/index*", "indices:data/write/bulk*", "indices:data/write/update*", PutMappingAction.NAME);
"indices:data/write/index:op_type/create", "indices:data/write/bulk*", PutMappingAction.NAME, AutoPutMappingAction.NAME);
private static final Automaton INDEX_AUTOMATON = patterns("indices:data/write/index*", "indices:data/write/bulk*",
"indices:data/write/update*", PutMappingAction.NAME, AutoPutMappingAction.NAME);
private static final Automaton DELETE_AUTOMATON = patterns("indices:data/write/delete*", "indices:data/write/bulk*");
private static final Automaton WRITE_AUTOMATON = patterns("indices:data/write/*", PutMappingAction.NAME);
private static final Automaton WRITE_AUTOMATON = patterns("indices:data/write/*", PutMappingAction.NAME,
AutoPutMappingAction.NAME);
private static final Automaton MONITOR_AUTOMATON = patterns("indices:monitor/*");
private static final Automaton MANAGE_AUTOMATON =
unionAndMinimize(Arrays.asList(MONITOR_AUTOMATON, patterns("indices:admin/*")));