diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 142aa6bde74..8a8cea82b0a 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -357,6 +357,7 @@ public class ActionModule extends AbstractModule { private final AutoCreateIndex autoCreateIndex; private final DestructiveOperations destructiveOperations; private final RestController restController; + private final TransportPutMappingAction.RequestValidators mappingRequestValidators; public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver, IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter, @@ -388,6 +389,10 @@ public class ActionModule extends AbstractModule { restWrapper = newRestWrapper; } } + mappingRequestValidators = new TransportPutMappingAction.RequestValidators( + actionPlugins.stream().flatMap(p -> p.mappingRequestValidators().stream()).collect(Collectors.toList()) + ); + if (transportClient) { restController = null; } else { @@ -678,6 +683,7 @@ public class ActionModule extends AbstractModule { protected void configure() { bind(ActionFilters.class).toInstance(actionFilters); bind(DestructiveOperations.class).toInstance(destructiveOperations); + bind(TransportPutMappingAction.RequestValidators.class).toInstance(mappingRequestValidators); if (false == transportClient) { // Supporting classes only used when not a transport client diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/MappingRequestValidator.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/MappingRequestValidator.java new file mode 100644 index 00000000000..8d6608c5758 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/MappingRequestValidator.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.mapping.put; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.index.Index; + +/** + * A validator that validates a {@link PutMappingRequest} before executing it. + * @see TransportPutMappingAction.RequestValidators + */ +public interface MappingRequestValidator { + + /** + * Validates a given put mapping request with its associated concrete indices and the current state. + * + * @param request the request to validate + * @param state the current cluster state + * @param indices the concrete indices that associated with the given put mapping request + * @return a non-null exception indicates a reason that the given request should be aborted; otherwise returns null. + */ + Exception validateRequest(PutMappingRequest request, ClusterState state, Index[] indices); +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java index 68687081453..9b903a81e03 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequest.java @@ -74,6 +74,7 @@ public class PutMappingRequest extends AcknowledgedRequest im private String type; private String source; + private String origin = ""; private Index concreteIndex; @@ -184,6 +185,16 @@ public class PutMappingRequest extends AcknowledgedRequest im return source(buildFromSimplifiedDef(type, source)); } + public String origin() { + return origin; + } + + public PutMappingRequest origin(String origin) { + // reserve "null" for bwc. + this.origin = Objects.requireNonNull(origin); + return this; + } + /** * @param type * the mapping type @@ -301,6 +312,11 @@ public class PutMappingRequest extends AcknowledgedRequest im in.readBoolean(); // updateAllTypes } concreteIndex = in.readOptionalWriteable(Index::new); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + origin = in.readOptionalString(); + } else { + origin = null; + } } @Override @@ -314,6 +330,9 @@ public class PutMappingRequest extends AcknowledgedRequest im out.writeBoolean(true); // updateAllTypes } out.writeOptionalWriteable(concreteIndex); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalString(origin); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index 565fd0616d0..acd0d102814 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -37,20 +37,25 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Collection; + /** * Put mapping action. */ public class TransportPutMappingAction extends TransportMasterNodeAction { private final MetaDataMappingService metaDataMappingService; + private final RequestValidators requestValidators; @Inject public TransportPutMappingAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, MetaDataMappingService metaDataMappingService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + RequestValidators requestValidators) { super(PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutMappingRequest::new); this.metaDataMappingService = metaDataMappingService; + this.requestValidators = requestValidators; } @Override @@ -82,6 +87,11 @@ public class TransportPutMappingAction extends TransportMasterNodeAction validators; + + public RequestValidators(Collection validators) { + this.validators = validators; + } + + private Exception validateRequest(PutMappingRequest request, ClusterState state, Index[] indices) { + Exception firstException = null; + for (MappingRequestValidator validator : validators) { + final Exception e = validator.validateRequest(request, state, indices); + if (firstException == null) { + firstException = e; + } else { + firstException.addSuppressed(e); + } + } + return firstException; + } + } } diff --git a/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java b/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java index c0d94c3f000..adc2fa8f0b2 100644 --- a/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/ActionPlugin.java @@ -22,6 +22,8 @@ package org.elasticsearch.plugins; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator; +import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; @@ -179,4 +181,12 @@ public interface ActionPlugin { return Objects.hash(action, transportAction, supportTransportActions); } } + + /** + * Returns a collection of validators that are used by {@link TransportPutMappingAction.RequestValidators} to + * validate a {@link org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest} before the executing it. + */ + default Collection mappingRequestValidators() { + return Collections.emptyList(); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/put/ValidateMappingRequestPluginIT.java b/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/put/ValidateMappingRequestPluginIT.java new file mode 100644 index 00000000000..b25c9ecb5fc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/mapping/put/ValidateMappingRequestPluginIT.java @@ -0,0 +1,99 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.mapping.put; + +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class ValidateMappingRequestPluginIT extends ESSingleNodeTestCase { + static final Map> allowedOrigins = ConcurrentCollections.newConcurrentMap(); + public static class TestPlugin extends Plugin implements ActionPlugin { + @Override + public Collection mappingRequestValidators() { + return Collections.singletonList((request, state, indices) -> { + for (Index index : indices) { + if (allowedOrigins.getOrDefault(index.getName(), Collections.emptySet()).contains(request.origin()) == false) { + return new IllegalStateException("not allowed: index[" + index.getName() + "] origin[" + request.origin() + "]"); + } + } + return null; + }); + } + } + + @Override + protected Collection> getPlugins() { + return Collections.singletonList(TestPlugin.class); + } + + public void testValidateMappingRequest() { + createIndex("index_1"); + createIndex("index_2"); + allowedOrigins.put("index_1", Arrays.asList("1", "2")); + allowedOrigins.put("index_2", Arrays.asList("2", "3")); + { + String origin = randomFrom("", "3", "4", "5"); + PutMappingRequest request = new PutMappingRequest().indices("index_1").type("doc").source("t1", "type=keyword").origin(origin); + Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet()); + assertThat(e.getMessage(), equalTo("not allowed: index[index_1] origin[" + origin + "]")); + } + { + PutMappingRequest request = new PutMappingRequest().indices("index_1").origin(randomFrom("1", "2")) + .type("doc").source("t1", "type=keyword"); + assertAcked(client().admin().indices().putMapping(request).actionGet()); + } + + { + String origin = randomFrom("", "1", "4", "5"); + PutMappingRequest request = new PutMappingRequest().indices("index_2").type("doc").source("t2", "type=keyword").origin(origin); + Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet()); + assertThat(e.getMessage(), equalTo("not allowed: index[index_2] origin[" + origin + "]")); + } + { + PutMappingRequest request = new PutMappingRequest().indices("index_2").origin(randomFrom("2", "3")) + .type("doc").source("t1", "type=keyword"); + assertAcked(client().admin().indices().putMapping(request).actionGet()); + } + + { + String origin = randomFrom("", "1", "3", "4"); + PutMappingRequest request = new PutMappingRequest().indices("*").type("doc").source("t3", "type=keyword").origin(origin); + Exception e = expectThrows(IllegalStateException.class, () -> client().admin().indices().putMapping(request).actionGet()); + assertThat(e.getMessage(), containsString("not allowed:")); + } + { + PutMappingRequest request = new PutMappingRequest().indices("index_2").origin("2") + .type("doc").source("t3", "type=keyword"); + assertAcked(client().admin().indices().putMapping(request).actionGet()); + } + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 8ab9e396b4d..acda8d06dc5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.ccr; import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -45,6 +46,7 @@ import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; +import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; @@ -312,4 +314,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E protected XPackLicenseState getLicenseState() { return XPackPlugin.getSharedLicenseState(); } + @Override + public Collection mappingRequestValidators() { + return Collections.singletonList(CcrRequests.CCR_PUT_MAPPING_REQUEST_VALIDATOR); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java index 12432c740a7..87d913c3376 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java @@ -5,10 +5,20 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.ccr.CcrSettings; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; public final class CcrRequests { @@ -24,8 +34,27 @@ public final class CcrRequests { public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) { PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex); + putMappingRequest.origin("ccr"); putMappingRequest.type(mappingMetaData.type()); putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); return putMappingRequest; } + + public static final MappingRequestValidator CCR_PUT_MAPPING_REQUEST_VALIDATOR = (request, state, indices) -> { + if (request.origin() == null) { + return null; // a put-mapping-request on old versions does not have origin. + } + final List followingIndices = Arrays.stream(indices) + .filter(index -> { + final IndexMetaData indexMetaData = state.metaData().index(index); + return indexMetaData != null && CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(indexMetaData.getSettings()); + }).collect(Collectors.toList()); + if (followingIndices.isEmpty() == false && "ccr".equals(request.origin()) == false) { + final String errorMessage = "can't put mapping to the following indices " + + "[" + followingIndices.stream().map(Index::getName).collect(Collectors.joining(", ")) + "]; " + + "the mapping of the following indices are self-replicated from its leader indices"; + return new ElasticsearchStatusException(errorMessage, RestStatus.FORBIDDEN); + } + return null; + }; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 857445ad88d..e811480e1b1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; @@ -46,6 +47,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; @@ -209,6 +211,25 @@ public class IndexFollowingIT extends CcrIntegTestCase { assertThat(XContentMapValues.extractValue("properties.k", mappingMetaData.sourceAsMap()), nullValue()); } + public void testDoNotAllowPutMappingToFollower() throws Exception { + final String leaderIndexSettings = getIndexSettings(between(1, 2), between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate("index-1").setSource(leaderIndexSettings, XContentType.JSON)); + followerClient().execute(PutFollowAction.INSTANCE, putFollow("index-1", "index-2")).get(); + PutMappingRequest putMappingRequest = new PutMappingRequest("index-2").type("doc").source("new_field", "type=keyword"); + ElasticsearchStatusException forbiddenException = expectThrows(ElasticsearchStatusException.class, + () -> followerClient().admin().indices().putMapping(putMappingRequest).actionGet()); + assertThat(forbiddenException.getMessage(), + equalTo("can't put mapping to the following indices [index-2]; " + + "the mapping of the following indices are self-replicated from its leader indices")); + assertThat(forbiddenException.status(), equalTo(RestStatus.FORBIDDEN)); + pauseFollow("index-2"); + followerClient().admin().indices().close(new CloseIndexRequest("index-2")).actionGet(); + assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index-2")).actionGet()); + followerClient().admin().indices().open(new OpenIndexRequest("index-2")).actionGet(); + assertAcked(followerClient().admin().indices().putMapping(putMappingRequest).actionGet()); + } + public void testFollowIndex_backlog() throws Exception { int numberOfShards = between(1, 5); String leaderIndexSettings = getIndexSettings(numberOfShards, between(0, 1), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java index 2b19eea5b56..1dd07a5df81 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.client.Client; @@ -427,6 +428,12 @@ public class LocalStateCompositeXPackPlugin extends XPackPlugin implements Scrip } } + @Override + public Collection mappingRequestValidators() { + return filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.mappingRequestValidators().stream()) + .collect(Collectors.toList()); + } + private List filterPlugins(Class type) { return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T)p)) .collect(Collectors.toList());