Enrich validate nested mappings (#42452)

Ensures that fields retained in an enrich index from a source are not contained
under a nested field. It additionally makes sure that key fields exist, and that
value fields are checked if they are present. The policy runner test has also
been expanded with some faulty mapping test cases.
This commit is contained in:
James Baiera 2019-06-03 14:58:06 -04:00
parent 215170b6c3
commit 415f1a484f
2 changed files with 860 additions and 182 deletions

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -99,6 +100,11 @@ public class EnrichPolicyRunner implements Runnable {
private Map<String, Object> getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) { private Map<String, Object> getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) {
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getIndexResponse.mappings(); ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = getIndexResponse.mappings();
ImmutableOpenMap<String, MappingMetaData> indexMapping = mappings.get(sourceIndexName); ImmutableOpenMap<String, MappingMetaData> indexMapping = mappings.get(sourceIndexName);
if (indexMapping.keys().size() == 0) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed. No mapping available on source [{}] included in [{}]",
policyName, sourceIndexName, policy.getIndices());
}
assert indexMapping.keys().size() == 1 : "Expecting only one type per index"; assert indexMapping.keys().size() == 1 : "Expecting only one type per index";
MappingMetaData typeMapping = indexMapping.iterator().next().value; MappingMetaData typeMapping = indexMapping.iterator().next().value;
return typeMapping.sourceAsMap(); return typeMapping.sourceAsMap();
@ -109,19 +115,74 @@ public class EnrichPolicyRunner implements Runnable {
logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices); logger.debug("Policy [{}]: Validating [{}] source mappings", policyName, sourceIndices);
for (String sourceIndex : sourceIndices) { for (String sourceIndex : sourceIndices) {
Map<String, Object> mapping = getMappings(getIndexResponse, sourceIndex); Map<String, Object> mapping = getMappings(getIndexResponse, sourceIndex);
Map<?, ?> properties = ((Map<?, ?>) mapping.get("properties")); // First ensure mapping is set
if (properties == null) { if (mapping.get("properties") == null) {
listener.onFailure( throw new ElasticsearchException(
new ElasticsearchException(
"Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]", "Enrich policy execution for [{}] failed. Could not read mapping for source [{}] included by pattern [{}]",
policyName, sourceIndex, policy.getIndices())); policyName, sourceIndex, policy.getIndices());
} }
if (properties.containsKey(policy.getEnrichKey()) == false) { // Validate the key and values
listener.onFailure( try {
new ElasticsearchException( validateField(mapping, policy.getEnrichKey(), true);
"Enrich policy execution for [{}] failed. Could not locate enrich key field [{}] on mapping for index [{}]", for (String valueFieldName : policy.getEnrichValues()) {
policyName, policy.getEnrichKey(), sourceIndex)); validateField(mapping, valueFieldName, false);
} }
} catch (ElasticsearchException e) {
throw new ElasticsearchException(
"Enrich policy execution for [{}] failed while validating field mappings for index [{}]",
e, policyName, sourceIndex);
}
}
}
private void validateField(Map<?, ?> properties, String fieldName, boolean fieldRequired) {
assert Strings.isEmpty(fieldName) == false: "Field name cannot be null or empty";
String[] fieldParts = fieldName.split("\\.");
StringBuilder parent = new StringBuilder();
Map<?, ?> currentField = properties;
boolean onRoot = true;
for (String fieldPart : fieldParts) {
// Ensure that the current field is of object type only (not a nested type or a non compound field)
Object type = currentField.get("type");
if (type != null && "object".equals(type) == false) {
throw new ElasticsearchException(
"Could not traverse mapping to field [{}]. The [{}] field must be regular object but was [{}].",
fieldName,
onRoot ? "root" : parent.toString(),
type
);
}
Map<?, ?> currentProperties = ((Map<?, ?>) currentField.get("properties"));
if (currentProperties == null) {
if (fieldRequired) {
throw new ElasticsearchException(
"Could not traverse mapping to field [{}]. Expected the [{}] field to have sub fields but none were configured.",
fieldName,
onRoot ? "root" : parent.toString()
);
} else {
return;
}
}
currentField = ((Map<?, ?>) currentProperties.get(fieldPart));
if (currentField == null) {
if (fieldRequired) {
throw new ElasticsearchException(
"Could not traverse mapping to field [{}]. Could not find the [{}] field under [{}]",
fieldName,
fieldPart,
onRoot ? "root" : parent.toString()
);
} else {
return;
}
}
if (onRoot) {
onRoot = false;
} else {
parent.append(".");
}
parent.append(fieldPart);
} }
} }