Do not allow put mapping on follower (#37675)
Today, the mapping on the follower is managed and replicated from its leader index by the ShardFollowTask. Thus, we should prevent users from modifying the mapping on the follower indices. Relates #30086
This commit is contained in:
parent
187b233571
commit
76fb573569
|
@ -357,6 +357,7 @@ public class ActionModule extends AbstractModule {
|
|||
private final AutoCreateIndex autoCreateIndex;
|
||||
private final DestructiveOperations destructiveOperations;
|
||||
private final RestController restController;
|
||||
private final TransportPutMappingAction.RequestValidators mappingRequestValidators;
|
||||
|
||||
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
|
||||
|
@ -388,6 +389,10 @@ public class ActionModule extends AbstractModule {
|
|||
restWrapper = newRestWrapper;
|
||||
}
|
||||
}
|
||||
mappingRequestValidators = new TransportPutMappingAction.RequestValidators(
|
||||
actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList())
|
||||
);
|
||||
|
||||
if (transportClient) {
|
||||
restController = null;
|
||||
} else {
|
||||
|
@ -678,6 +683,7 @@ public class ActionModule extends AbstractModule {
|
|||
protected void configure() {
|
||||
bind(ActionFilters.class).toInstance(actionFilters);
|
||||
bind(DestructiveOperations.class).toInstance(destructiveOperations);
|
||||
bind(TransportPutMappingAction.RequestValidators.class).toInstance(mappingRequestValidators);
|
||||
|
||||
if (false == transportClient) {
|
||||
// Supporting classes only used when not a transport client
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.mapping.put;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
/**
|
||||
* A validator that validates a {@link PutMappingRequest} before executing it.
|
||||
* @see TransportPutMappingAction.RequestValidators
|
||||
*/
|
||||
public interface MappingRequestValidator {
|
||||
|
||||
/**
|
||||
* Validates a given put mapping request with its associated concrete indices and the current state.
|
||||
*
|
||||
* @param request the request to validate
|
||||
* @param state the current cluster state
|
||||
* @param indices the concrete indices that associated with the given put mapping request
|
||||
* @return a non-null exception indicates a reason that the given request should be aborted; otherwise returns null.
|
||||
*/
|
||||
Exception validateRequest(PutMappingRequest request, ClusterState state, Index[] indices);
|
||||
}
|
|
@ -74,6 +74,7 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
|
|||
private String type;
|
||||
|
||||
private String source;
|
||||
private String origin = "";
|
||||
|
||||
private Index concreteIndex;
|
||||
|
||||
|
@ -184,6 +185,16 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
|
|||
return source(buildFromSimplifiedDef(type, source));
|
||||
}
|
||||
|
||||
public String origin() {
|
||||
return origin;
|
||||
}
|
||||
|
||||
public PutMappingRequest origin(String origin) {
|
||||
// reserve "null" for bwc.
|
||||
this.origin = Objects.requireNonNull(origin);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param type
|
||||
* the mapping type
|
||||
|
@ -301,6 +312,11 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
|
|||
in.readBoolean(); // updateAllTypes
|
||||
}
|
||||
concreteIndex = in.readOptionalWriteable(Index::new);
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
origin = in.readOptionalString();
|
||||
} else {
|
||||
origin = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -314,6 +330,9 @@ public class PutMappingRequest extends AcknowledgedRequest<PutMappingRequest> im
|
|||
out.writeBoolean(true); // updateAllTypes
|
||||
}
|
||||
out.writeOptionalWriteable(concreteIndex);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeOptionalString(origin);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,20 +37,25 @@ import org.elasticsearch.index.IndexNotFoundException;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Put mapping action.
|
||||
*/
|
||||
public class TransportPutMappingAction extends TransportMasterNodeAction<PutMappingRequest, AcknowledgedResponse> {
|
||||
|
||||
private final MetaDataMappingService metaDataMappingService;
|
||||
private final RequestValidators requestValidators;
|
||||
|
||||
@Inject
|
||||
public TransportPutMappingAction(TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, MetaDataMappingService metaDataMappingService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
RequestValidators requestValidators) {
|
||||
super(PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
|
||||
PutMappingRequest::new);
|
||||
this.metaDataMappingService = metaDataMappingService;
|
||||
this.requestValidators = requestValidators;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,6 +87,11 @@ public class TransportPutMappingAction extends TransportMasterNodeAction<PutMapp
|
|||
final Index[] concreteIndices = request.getConcreteIndex() == null ?
|
||||
indexNameExpressionResolver.concreteIndices(state, request)
|
||||
: new Index[] {request.getConcreteIndex()};
|
||||
final Exception validationException = requestValidators.validateRequest(request, state, concreteIndices);
|
||||
if (validationException != null) {
|
||||
listener.onFailure(validationException);
|
||||
return;
|
||||
}
|
||||
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
|
||||
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
|
||||
.indices(concreteIndices).type(request.type())
|
||||
|
@ -107,4 +117,26 @@ public class TransportPutMappingAction extends TransportMasterNodeAction<PutMapp
|
|||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class RequestValidators {
|
||||
private final Collection<MappingRequestValidator> validators;
|
||||
|
||||
public RequestValidators(Collection<MappingRequestValidator> validators) {
|
||||
this.validators = validators;
|
||||
}
|
||||
|
||||
private Exception validateRequest(PutMappingRequest request, ClusterState state, Index[] indices) {
|
||||
Exception firstException = null;
|
||||
for (MappingRequestValidator validator : validators) {
|
||||
final Exception e = validator.validateRequest(request, state, indices);
|
||||
if (firstException == null) {
|
||||
firstException = e;
|
||||
} else {
|
||||
firstException.addSuppressed(e);
|
||||
}
|
||||
}
|
||||
return firstException;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ package org.elasticsearch.plugins;
|
|||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.TransportAction;
|
||||
import org.elasticsearch.action.support.TransportActions;
|
||||
|
@ -179,4 +181,12 @@ public interface ActionPlugin {
|
|||
return Objects.hash(action, transportAction, supportTransportActions);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a collection of validators that are used by {@link TransportPutMappingAction.RequestValidators} to
|
||||
* validate a {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} before the executing it.
|
||||
*/
|
||||
default Collection<MappingRequestValidator> mappingRequestValidators() {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.mapping.put;
|
||||
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class ValidateMappingRequestPluginIT extends ESSingleNodeTestCase {
|
||||
static final Map<String, Collection<String>> allowedOrigins = ConcurrentCollections.newConcurrentMap();
|
||||
public static class TestPlugin extends Plugin implements ActionPlugin {
|
||||
@Override
|
||||
public Collection<MappingRequestValidator> mappingRequestValidators() {
|
||||
return Collections.singletonList((request, state, indices) -> {
|
||||
for (Index index : indices) {
|
||||
if (allowedOrigins.getOrDefault(index.getName(), Collections.emptySet()).contains(request.origin()) == false) {
|
||||
return new IllegalStateException("not allowed: index[" + index.getName() + "] origin[" + request.origin() + "]");
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||
return Collections.singletonList(TestPlugin.class);
|
||||
}
|
||||
|
||||
public void testValidateMappingRequest() {
|
||||
createIndex("index_1");
|
||||
createIndex("index_2");
|
||||
allowedOrigins.put("index_1", Arrays.asList("1", "2"));
|
||||
allowedOrigins.put("index_2", Arrays.asList("2", "3"));
|
||||
{
|
||||
String origin = randomFrom("", "3", "4", "5");
|
||||
PutMappingRequest request = new PutMappingRequest().indices("index_1").type("doc").source("t1", "type=keyword").origin(origin);
|
||||
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("not allowed: index[index_1] origin[" + origin + "]"));
|
||||
}
|
||||
{
|
||||
PutMappingRequest request = new PutMappingRequest().indices("index_1").origin(randomFrom("1", "2"))
|
||||
.type("doc").source("t1", "type=keyword");
|
||||
assertAcked(client().admin().indices().putMapping(request).actionGet());
|
||||
}
|
||||
|
||||
{
|
||||
String origin = randomFrom("", "1", "4", "5");
|
||||
PutMappingRequest request = new PutMappingRequest().indices("index_2").type("doc").source("t2", "type=keyword").origin(origin);
|
||||
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("not allowed: index[index_2] origin[" + origin + "]"));
|
||||
}
|
||||
{
|
||||
PutMappingRequest request = new PutMappingRequest().indices("index_2").origin(randomFrom("2", "3"))
|
||||
.type("doc").source("t1", "type=keyword");
|
||||
assertAcked(client().admin().indices().putMapping(request).actionGet());
|
||||
}
|
||||
|
||||
{
|
||||
String origin = randomFrom("", "1", "3", "4");
|
||||
PutMappingRequest request = new PutMappingRequest().indices("*").type("doc").source("t3", "type=keyword").origin(origin);
|
||||
Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet());
|
||||
assertThat(e.getMessage(), containsString("not allowed:"));
|
||||
}
|
||||
{
|
||||
PutMappingRequest request = new PutMappingRequest().indices("index_2").origin("2")
|
||||
.type("doc").source("t3", "type=keyword");
|
||||
assertAcked(client().admin().indices().putMapping(request).actionGet());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ccr;
|
|||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -45,6 +46,7 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
|
||||
import org.elasticsearch.xpack.ccr.action.CcrRequests;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
|
||||
|
@ -312,4 +314,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
|
||||
protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); }
|
||||
|
||||
@Override
|
||||
public Collection<MappingRequestValidator> mappingRequestValidators() {
|
||||
return Collections.singletonList(CcrRequests.CCR_PUT_MAPPING_REQUEST_VALIDATOR);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,10 +5,20 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class CcrRequests {
|
||||
|
||||
|
@ -24,8 +34,27 @@ public final class CcrRequests {
|
|||
|
||||
public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) {
|
||||
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
|
||||
putMappingRequest.origin("ccr");
|
||||
putMappingRequest.type(mappingMetaData.type());
|
||||
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
|
||||
return putMappingRequest;
|
||||
}
|
||||
|
||||
public static final MappingRequestValidator CCR_PUT_MAPPING_REQUEST_VALIDATOR = (request, state, indices) -> {
|
||||
if (request.origin() == null) {
|
||||
return null; // a put-mapping-request on old versions does not have origin.
|
||||
}
|
||||
final List<Index> followingIndices = Arrays.stream(indices)
|
||||
.filter(index -> {
|
||||
final IndexMetaData indexMetaData = state.metaData().index(index);
|
||||
return indexMetaData != null && CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(indexMetaData.getSettings());
|
||||
}).collect(Collectors.toList());
|
||||
if (followingIndices.isEmpty() == false && "ccr".equals(request.origin()) == false) {
|
||||
final String errorMessage = "can't put mapping to the following indices "
|
||||
+ "[" + followingIndices.stream().map(Index::getName).collect(Collectors.joining(", ")) + "]; "
|
||||
+ "the mapping of the following indices are self-replicated from its leader indices";
|
||||
return new ElasticsearchStatusException(errorMessage, RestStatus.FORBIDDEN);
|
||||
}
|
||||
return null;
|
||||
};
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||
|
@ -46,6 +47,7 @@ import org.elasticsearch.index.IndexNotFoundException;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
|
@ -209,6 +211,25 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|||
assertThat(XContentMapValues.extractValue("properties.k", mappingMetaData.sourceAsMap()), nullValue());
|
||||
}
|
||||
|
||||
public void testDoNotAllowPutMappingToFollower() throws Exception {
|
||||
final String leaderIndexSettings = getIndexSettings(between(1, 2), between(0, 1),
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate("index-1").setSource(leaderIndexSettings, XContentType.JSON));
|
||||
followerClient().execute(PutFollowAction.INSTANCE, putFollow("index-1", "index-2")).get();
|
||||
PutMappingRequest putMappingRequest = new PutMappingRequest("index-2").type("doc").source("new_field", "type=keyword");
|
||||
ElasticsearchStatusException forbiddenException = expectThrows(ElasticsearchStatusException.class,
|
||||
() -> followerClient().admin().indices().putMapping(putMappingRequest).actionGet());
|
||||
assertThat(forbiddenException.getMessage(),
|
||||
equalTo("can't put mapping to the following indices [index-2]; " +
|
||||
"the mapping of the following indices are self-replicated from its leader indices"));
|
||||
assertThat(forbiddenException.status(), equalTo(RestStatus.FORBIDDEN));
|
||||
pauseFollow("index-2");
|
||||
followerClient().admin().indices().close(new CloseIndexRequest("index-2")).actionGet();
|
||||
assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index-2")).actionGet());
|
||||
followerClient().admin().indices().open(new OpenIndexRequest("index-2")).actionGet();
|
||||
assertAcked(followerClient().admin().indices().putMapping(putMappingRequest).actionGet());
|
||||
}
|
||||
|
||||
public void testFollowIndex_backlog() throws Exception {
|
||||
int numberOfShards = between(1, 5);
|
||||
String leaderIndexSettings = getIndexSettings(numberOfShards, between(0, 1),
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core;
|
|||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.bootstrap.BootstrapCheck;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -427,6 +428,12 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<MappingRequestValidator> mappingRequestValidators() {
|
||||
return filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.mappingRequestValidators().stream())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private <T> List<T> filterPlugins(Class<T> type) {
|
||||
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p))
|
||||
.collect(Collectors.toList());
|
||||
|
|
Loading…
Reference in New Issue