Backport: also validate source index at put enrich policy time (#48311)

Backport of: #48254

This changes tests to create a valid
source index prior to creating the enrich policy.
This commit is contained in:
Martijn van Groningen 2019-10-22 07:38:16 +02:00 committed by GitHub
parent d0a4bad95b
commit c09b62d5bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 179 additions and 29 deletions

View File

@ -44,6 +44,8 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.singletonMap;
public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
@After
@ -59,6 +61,10 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
public void testPutPolicy() throws Exception {
RestHighLevelClient client = highLevelClient();
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(singletonMap("properties", singletonMap("email", singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
// tag::enrich-put-policy-request
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Arrays.asList("users"),
@ -106,6 +112,10 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
RestHighLevelClient client = highLevelClient();
{
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(singletonMap("properties", singletonMap("email", singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
// Add a policy, so that it can be deleted:
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Arrays.asList("users"),
@ -158,6 +168,10 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
public void testGetPolicy() throws Exception {
RestHighLevelClient client = highLevelClient();
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(singletonMap("properties", singletonMap("email", singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Collections.singletonList("users"),
"email", Arrays.asList("address", "zip", "city", "state"));
@ -259,8 +273,8 @@ public class EnrichDocumentationIT extends ESRestHighLevelClientTestCase {
{
CreateIndexRequest createIndexRequest = new CreateIndexRequest("users")
.mapping(Collections.singletonMap("properties", Collections.singletonMap("email",
Collections.singletonMap("type", "keyword"))));
.mapping(singletonMap("properties", singletonMap("email",
singletonMap("type", "keyword"))));
client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
PutPolicyRequest putPolicyRequest = new PutPolicyRequest(
"users-policy", "match", Collections.singletonList("users"),

View File

@ -12,6 +12,13 @@ Deletes an existing enrich policy and its enrich index.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}
PUT /_enrich/policy/my-policy
{

View File

@ -12,6 +12,13 @@ Returns information about an enrich policy.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}
PUT /_enrich/policy/my-policy
{

View File

@ -12,6 +12,13 @@ Creates an enrich policy.
[source,console]
----
PUT /users
{
"mappings" : {
"properties" : {
"email" : { "type" : "keyword" }
}
}
}
----
////

View File

@ -10,6 +10,7 @@ import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -38,6 +39,15 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
for (Map<?, ?> entry: policies) {
client().performRequest(new Request("DELETE", "/_enrich/policy/" +
XContentMapValues.extractValue("config.match.name", entry)));
List<?> sourceIndices = (List<?>) XContentMapValues.extractValue("config.match.indices", entry);
for (Object sourceIndex : sourceIndices) {
try {
client().performRequest(new Request("DELETE", "/" + sourceIndex));
} catch (ResponseException e) {
// and that is ok
}
}
}
}
@ -48,6 +58,8 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
}
private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception {
// Create source index:
createSourceIndex("my-source-index");
// Create the policy:
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
@ -99,6 +111,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
}
public void testImmutablePolicy() throws IOException {
createSourceIndex("my-source-index");
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
assertOK(client().performRequest(putPolicyRequest));
@ -108,6 +121,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
}
public void testDeleteIsCaseSensitive() throws Exception {
createSourceIndex("my-source-index");
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
assertOK(client().performRequest(putPolicyRequest));
@ -155,6 +169,20 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
return Strings.toString(source);
}
public static void createSourceIndex(String index) throws IOException {
String mapping = createSourceIndexMapping();
createIndex(index, Settings.EMPTY, mapping);
}
public static String createSourceIndexMapping() {
return "\"properties\":" +
"{\"host\": {\"type\":\"keyword\"}," +
"\"globalRank\":{\"type\":\"keyword\"}," +
"\"tldRank\":{\"type\":\"keyword\"}," +
"\"tld\":{\"type\":\"keyword\"}" +
"}";
}
private static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}

View File

@ -36,6 +36,9 @@ public class EnrichSecurityIT extends CommonEnrichRestTestCase {
public void testInsufficientPermissionsOnNonExistentIndex() throws Exception {
// This test is here because it requires a valid user that has permission to execute policy PUTs but should fail if the user
// does not have access to read the backing indices used to enrich the data.
Request request = new Request("PUT", "/some-other-index");
request.setJsonEntity("{\n \"mappings\" : {" + createSourceIndexMapping() + "} }");
adminClient().performRequest(request);
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("some-other-index"));
ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(putPolicyRequest));

View File

@ -1,6 +1,20 @@
---
"Test enrich crud apis":
- do:
indices.create:
index: bar
body:
mappings:
properties:
baz:
type: keyword
a:
type: keyword
b:
type: keyword
- is_true: acknowledged
- do:
enrich.put_policy:
name: policy-crud

View File

@ -136,27 +136,34 @@ public class EnrichPolicyRunner implements Runnable {
logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices);
for (String sourceIndex : sourceIndices) {
Map<String, Object> mapping = getMappings(getIndexResponse, sourceIndex);
// First ensure mapping is set
if (mapping.get("properties") == null) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices());
}
// Validate the key and values
try {
validateField(mapping, policy.getMatchField(), true);
for (String valueFieldName : policy.getEnrichFields()) {
validateField(mapping, valueFieldName, false);
}
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed while validating field mappings for index [{}]",
e, policyName, sourceIndex);
}
validateMappings(policyName, policy, sourceIndex, mapping);
}
}
private void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
static void validateMappings(final String policyName,
final EnrichPolicy policy,
final String sourceIndex,
final Map<String, Object> mapping) {
// First ensure mapping is set
if (mapping.get("properties") == null) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices());
}
// Validate the key and values
try {
validateField(mapping, policy.getMatchField(), true);
for (String valueFieldName : policy.getEnrichFields()) {
validateField(mapping, valueFieldName, false);
}
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed while validating field mappings for index [{}]",
e, policyName, sourceIndex);
}
}
private static void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
assert Strings.isEmpty(fieldName) == false: "Field name cannot be null or empty";
String[] fieldParts = fieldName.split("\\.");
StringBuilder parent = new StringBuilder();

View File

@ -8,8 +8,12 @@ package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
@ -40,7 +44,11 @@ public final class EnrichStore {
* @param policy The policy to store
* @param handler The handler that gets invoked if policy has been stored or a failure has occurred.
*/
public static void putPolicy(String name, EnrichPolicy policy, ClusterService clusterService, Consumer<Exception> handler) {
public static void putPolicy(final String name,
final EnrichPolicy policy,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Consumer<Exception> handler) {
assert clusterService.localNode().isMasterNode();
if (Strings.isNullOrEmpty(name)) {
@ -76,6 +84,22 @@ public final class EnrichStore {
finalPolicy = policy;
}
updateClusterState(clusterService, handler, current -> {
for (String indexExpression : finalPolicy.getIndices()) {
// indices field in policy can contain wildcards, aliases etc.
String[] concreteIndices =
indexNameExpressionResolver.concreteIndexNames(current, IndicesOptions.strictExpandOpen(), indexExpression);
for (String concreteIndex : concreteIndices) {
IndexMetaData imd = current.getMetaData().index(concreteIndex);
assert imd != null;
MappingMetaData mapping = imd.mapping();
if (mapping == null) {
throw new IllegalArgumentException("source index [" + concreteIndex + "] has no mapping");
}
Map<String, Object> mappingSource = mapping.getSourceAsMap();
EnrichPolicyRunner.validateMappings(name, finalPolicy, concreteIndex, mappingSource);
}
}
final Map<String, EnrichPolicy> policies = getPolicies(current);
if (policies.get(name) != null) {
throw new ResourceAlreadyExistsException("policy [{}] already exists", name);

View File

@ -102,7 +102,7 @@ public class TransportPutEnrichPolicyAction extends TransportMasterNodeAction<Pu
}
private void putPolicy(PutEnrichPolicyAction.Request request, ActionListener<AcknowledgedResponse> listener ) {
EnrichStore.putPolicy(request.getName(), request.getPolicy(), clusterService, e -> {
EnrichStore.putPolicy(request.getName(), request.getPolicy(), clusterService, indexNameExpressionResolver, e -> {
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {

View File

@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
@ -24,9 +28,13 @@ public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase {
protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
ClusterService clusterService) throws InterruptedException {
if (policy != null) {
createSourceIndices(policy);
}
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.putPolicy(name, policy, clusterService, e -> {
EnrichStore.putPolicy(name, policy, clusterService, resolver, e -> {
error.set(e);
latch.countDown();
});
@ -46,4 +54,20 @@ public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase {
throw error.get();
}
}
protected void createSourceIndices(EnrichPolicy policy) {
createSourceIndices(client(), policy);
}
protected static void createSourceIndices(Client client, EnrichPolicy policy) {
for (String sourceIndex : policy.getIndices()) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(sourceIndex);
createIndexRequest.mapping("_doc", policy.getMatchField(), "type=keyword");
try {
client.admin().indices().create(createIndexRequest).actionGet();
} catch (ResourceAlreadyExistsException e) {
// and that is okay
}
}
}
}

View File

@ -11,6 +11,7 @@ import java.util.Collections;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
@ -24,6 +25,7 @@ import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -34,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -114,13 +117,16 @@ public class EnrichPolicyMaintenanceServiceTests extends ESSingleNodeTestCase {
for (int i = 0; i < randomIntBetween(1, 3); i++) {
enrichKeys.add(randomAlphaOfLength(10));
}
return new EnrichPolicy(MATCH_TYPE, null, Collections.singletonList(randomAlphaOfLength(10)), randomAlphaOfLength(10),
String sourceIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
return new EnrichPolicy(MATCH_TYPE, null, Collections.singletonList(sourceIndex), randomAlphaOfLength(10),
enrichKeys);
}
private void addPolicy(String policyName, EnrichPolicy policy) throws InterruptedException {
IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
createSourceIndices(client(), policy);
doSyncronously((clusterService, exceptionConsumer) ->
EnrichStore.putPolicy(policyName, policy, clusterService, exceptionConsumer));
EnrichStore.putPolicy(policyName, policy, clusterService, resolver, exceptionConsumer));
}
private void removePolicy(String policyName) throws InterruptedException {

View File

@ -22,6 +22,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@ -59,7 +61,9 @@ public class EnrichPolicyTests extends AbstractSerializingTestCase<EnrichPolicy>
return new EnrichPolicy(
randomFrom(EnrichPolicy.SUPPORTED_POLICY_TYPES),
randomBoolean() ? querySource : null,
Arrays.asList(generateRandomStringArray(8, 4, false, false)),
Arrays.stream(generateRandomStringArray(8, 4, false, false))
.map(s -> s.toLowerCase(Locale.ROOT))
.collect(Collectors.toList()),
randomAlphaOfLength(4),
Arrays.asList(generateRandomStringArray(8, 4, false, false))
);

View File

@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -41,6 +42,7 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("index"),
"key1", Collections.singletonList("field1"));
createSourceIndices(client(), instance1);
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1);
assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet());
assertThat("Execute failed", client().execute(ExecuteEnrichPolicyAction.INSTANCE,
@ -53,8 +55,9 @@ public class EnrichPolicyUpdateTests extends ESSingleNodeTestCase {
Pipeline pipelineInstance1 = ingestService.getPipeline("1");
assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(MatchProcessor.class));
EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("index"),
EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Collections.singletonList("index2"),
"key2", Collections.singletonList("field2"));
createSourceIndices(client(), instance2);
ResourceAlreadyExistsException exc = expectThrows(ResourceAlreadyExistsException.class, () ->
client().execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request("my_policy", instance2)).actionGet());
assertTrue(exc.getMessage().contains("policy [my_policy] already exists"));

View File

@ -19,6 +19,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.DECORATE_FIELDS;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.MATCH_FIELD;
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.POLICY_NAME;
@ -50,6 +51,7 @@ public class EnrichRestartIT extends ESIntegTestCase {
EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null,
Collections.singletonList(SOURCE_INDEX_NAME), MATCH_FIELD, Arrays.asList(DECORATE_FIELDS));
createSourceIndices(client(), enrichPolicy);
for (int i = 0; i < numPolicies; i++) {
String policyName = POLICY_NAME + i;
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);

View File

@ -31,7 +31,7 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCase {
@After
private void cleanupPolicy() {
public void cleanupPolicy() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";

View File

@ -26,7 +26,7 @@ import static org.hamcrest.Matchers.nullValue;
public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase {
@After
private void cleanupPolicies() throws InterruptedException {
public void cleanupPolicies() throws InterruptedException {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
final CountDownLatch latch = new CountDownLatch(1);