Added enrich policy definition. (#41003)

Relates to #32789
This commit is contained in:
Martijn van Groningen 2019-04-12 11:14:19 +02:00
parent b66ad34565
commit d01c1f3ba0
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
8 changed files with 665 additions and 1 deletions

View File

@ -22,4 +22,3 @@ run {
// No tests yet: // No tests yet:
integTest.enabled = false integTest.enabled = false
test.enabled = false

View File

@ -0,0 +1,123 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Encapsulates enrich policies as custom metadata inside cluster state.
*/
public final class EnrichMetadata extends AbstractNamedDiffable<MetaData.Custom> implements XPackPlugin.XPackMetaDataCustom {
static final String TYPE = "enrich";
static final ParseField POLICIES = new ParseField("policies");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<EnrichMetadata, Void> PARSER = new ConstructingObjectParser<>(
"enrich_metadata",
args -> new EnrichMetadata((Map<String, EnrichPolicy>) args[0])
);
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
Map<String, EnrichPolicy> patterns = new HashMap<>();
String fieldName = null;
for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = p.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
patterns.put(fieldName, EnrichPolicy.PARSER.parse(p, c));
} else {
throw new ElasticsearchParseException("unexpected token [" + token + "]");
}
}
return patterns;
}, POLICIES);
}
public static EnrichMetadata fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
private final Map<String, EnrichPolicy> policies;
public EnrichMetadata(StreamInput in) throws IOException {
this(in.readMap(StreamInput::readString, EnrichPolicy::new));
}
public EnrichMetadata(Map<String, EnrichPolicy> policies) {
this.policies = Collections.unmodifiableMap(policies);
}
public Map<String, EnrichPolicy> getPolicies() {
return policies;
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.ALL_CONTEXTS;
}
@Override
public Version getMinimalSupportedVersion() {
// NO RELEASE: change when merging enrich & enrich-7.x into master and 7.x respectively:
return Version.V_7_1_0;
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(policies, StreamOutput::writeString, (out1, value) -> value.writeTo(out1));
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(POLICIES.getPreferredName());
for (Map.Entry<String, EnrichPolicy> entry : policies.entrySet()) {
builder.startObject(entry.getKey());
builder.value(entry.getValue());
builder.endObject();
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EnrichMetadata that = (EnrichMetadata) o;
return policies.equals(that.policies);
}
@Override
public int hashCode() {
return Objects.hash(policies);
}
}

View File

@ -5,11 +5,24 @@
*/ */
package org.elasticsearch.xpack.enrich; package org.elasticsearch.xpack.enrich;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
public class EnrichPlugin extends Plugin implements IngestPlugin { public class EnrichPlugin extends Plugin implements IngestPlugin {
@ -18,4 +31,28 @@ public class EnrichPlugin extends Plugin implements IngestPlugin {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) { public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
return Collections.emptyMap(); return Collections.emptyMap();
} }
@Override
public Collection<Object> createComponents(Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singleton(new EnrichStore(clusterService));
}
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE,
EnrichMetadata::new));
}
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Collections.singletonList(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(EnrichMetadata.TYPE),
EnrichMetadata::fromXContent));
}
} }

View File

@ -0,0 +1,221 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Represents an enrich policy including its configuration.
*/
public final class EnrichPolicy implements Writeable, ToXContentFragment {
static final String EXACT_MATCH_TYPE = "exact_match";
static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE};
static final ParseField TYPE = new ParseField("type");
static final ParseField QUERY = new ParseField("query");
static final ParseField INDEX_PATTERN = new ParseField("index_pattern");
static final ParseField ENRICH_KEY = new ParseField("enrich_key");
static final ParseField ENRICH_VALUES = new ParseField("enrich_values");
static final ParseField SCHEDULE = new ParseField("schedule");
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<EnrichPolicy, Void> PARSER = new ConstructingObjectParser<>("policy",
args -> {
return new EnrichPolicy(
(String) args[0],
(QuerySource) args[1],
(String) args[2],
(String) args[3],
(List<String>) args[4],
(String) args[5]
);
}
);
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
XContentBuilder contentBuilder = XContentBuilder.builder(p.contentType().xContent());
contentBuilder.generator().copyCurrentStructure(p);
return new QuerySource(BytesReference.bytes(contentBuilder), contentBuilder.contentType());
}, QUERY);
PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_PATTERN);
PARSER.declareString(ConstructingObjectParser.constructorArg(), ENRICH_KEY);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_VALUES);
PARSER.declareString(ConstructingObjectParser.constructorArg(), SCHEDULE);
}
private final String type;
private final QuerySource query;
private final String indexPattern;
private final String enrichKey;
private final List<String> enrichValues;
private final String schedule;
public EnrichPolicy(StreamInput in) throws IOException {
this(
in.readString(),
in.readOptionalWriteable(QuerySource::new),
in.readString(),
in.readString(),
in.readStringList(),
in.readString()
);
}
public EnrichPolicy(String type,
QuerySource query,
String indexPattern,
String enrichKey,
List<String> enrichValues,
String schedule) {
this.type = type;
this.query= query;
this.schedule = schedule;
this.indexPattern = indexPattern;
this.enrichKey = enrichKey;
this.enrichValues = enrichValues;
}
public String getType() {
return type;
}
public QuerySource getQuery() {
return query;
}
public String getIndexPattern() {
return indexPattern;
}
public String getEnrichKey() {
return enrichKey;
}
public List<String> getEnrichValues() {
return enrichValues;
}
public String getSchedule() {
return schedule;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
out.writeOptionalWriteable(query);
out.writeString(indexPattern);
out.writeString(enrichKey);
out.writeStringCollection(enrichValues);
out.writeString(schedule);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(TYPE.getPreferredName(), type);
if (query != null) {
builder.field(QUERY.getPreferredName(), query.getQueryAsMap());
}
builder.field(INDEX_PATTERN.getPreferredName(), indexPattern);
builder.field(ENRICH_KEY.getPreferredName(), enrichKey);
builder.array(ENRICH_VALUES.getPreferredName(), enrichValues.toArray(new String[0]));
builder.field(SCHEDULE.getPreferredName(), schedule);
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EnrichPolicy policy = (EnrichPolicy) o;
return type.equals(policy.type) &&
Objects.equals(query, policy.query) &&
indexPattern.equals(policy.indexPattern) &&
enrichKey.equals(policy.enrichKey) &&
enrichValues.equals(policy.enrichValues) &&
schedule.equals(policy.schedule);
}
@Override
public int hashCode() {
return Objects.hash(
type,
query,
indexPattern,
enrichKey,
enrichValues,
schedule
);
}
public String toString() {
return Strings.toString(this);
}
public static class QuerySource implements Writeable {
private final BytesReference query;
private final XContentType contentType;
QuerySource(StreamInput in) throws IOException {
this(in.readBytesReference(), in.readEnum(XContentType.class));
}
public QuerySource(BytesReference query, XContentType contentType) {
this.query = query;
this.contentType = contentType;
}
public BytesReference getQuery() {
return query;
}
public Map<String, Object> getQueryAsMap() {
return XContentHelper.convertToMap(query, true, contentType).v2();
}
public XContentType getContentType() {
return contentType;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesReference(query);
out.writeEnum(contentType);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QuerySource that = (QuerySource) o;
return query.equals(that.query) &&
contentType == that.contentType;
}
@Override
public int hashCode() {
return Objects.hash(query, contentType);
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
/**
* A components that provides access and stores an enrich policy.
*/
public final class EnrichStore {
private final ClusterService clusterService;
EnrichStore(ClusterService clusterService) {
this.clusterService = clusterService;
}
/**
* Adds a new enrich policy or overwrites an existing policy if there is already a policy with the same name.
* This method can only be invoked on the elected master node.
*
* @param name The unique name of the policy
* @param policy The policy to store
* @param handler The handler that gets invoked if policy has been stored or a failure has occurred.
*/
public void putPolicy(String name, EnrichPolicy policy, Consumer<Exception> handler) {
assert clusterService.localNode().isMasterNode();
// TODO: add validation
final Map<String, EnrichPolicy> policies;
final EnrichMetadata enrichMetadata = clusterService.state().metaData().custom(EnrichMetadata.TYPE);
if (enrichMetadata != null) {
// Make a copy, because policies map inside custom metadata is read only:
policies = new HashMap<>(enrichMetadata.getPolicies());
} else {
policies = new HashMap<>();
}
policies.put(name, policy);
clusterService.submitStateUpdateTask("update-enrich-policy", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MetaData metaData = MetaData.builder(currentState.metaData())
.putCustom(EnrichMetadata.TYPE, new EnrichMetadata(policies))
.build();
return ClusterState.builder(currentState)
.metaData(metaData)
.build();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
handler.accept(null);
}
@Override
public void onFailure(String source, Exception e) {
handler.accept(e);
}
});
}
/**
* Gets an enrich policy for the provided name if exists or otherwise returns <code>null</code>.
*
* @param name The name of the policy to fetch
* @return enrich policy if exists or <code>null</code> otherwise
*/
public EnrichPolicy getPolicy(String name) {
EnrichMetadata enrichMetadata = clusterService.state().metaData().custom(EnrichMetadata.TYPE);
if (enrichMetadata == null) {
return null;
}
return enrichMetadata.getPolicies().get(name);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy;
import static org.hamcrest.Matchers.equalTo;
public class EnrichMetadataTests extends AbstractSerializingTestCase<EnrichMetadata> {
@Override
protected EnrichMetadata doParseInstance(XContentParser parser) throws IOException {
return EnrichMetadata.fromXContent(parser);
}
@Override
protected EnrichMetadata createTestInstance() {
return randomEnrichMetadata(randomFrom(XContentType.values()));
}
@Override
protected EnrichMetadata createXContextTestInstance(XContentType xContentType) {
return randomEnrichMetadata(xContentType);
}
private static EnrichMetadata randomEnrichMetadata(XContentType xContentType) {
int numPolicies = randomIntBetween(8, 64);
Map<String, EnrichPolicy> policies = new HashMap<>(numPolicies);
for (int i = 0; i < numPolicies; i++) {
EnrichPolicy policy = randomEnrichPolicy(xContentType);
policies.put(randomAlphaOfLength(8), policy);
}
return new EnrichMetadata(policies);
}
@Override
protected Writeable.Reader<EnrichMetadata> instanceReader() {
return EnrichMetadata::new;
}
@Override
protected void assertEqualInstances(EnrichMetadata expectedInstance, EnrichMetadata newInstance) {
assertNotSame(expectedInstance, newInstance);
assertThat(newInstance.getPolicies().size(), equalTo(expectedInstance.getPolicies().size()));
for (Map.Entry<String, EnrichPolicy> entry : newInstance.getPolicies().entrySet()) {
EnrichPolicy actual = entry.getValue();
EnrichPolicy expected = expectedInstance.getPolicies().get(entry.getKey());
EnrichPolicyTests.assertEqualPolicies(expected, actual);
}
}
}

View File

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class EnrichPolicyTests extends AbstractSerializingTestCase<EnrichPolicy> {
@Override
protected EnrichPolicy doParseInstance(XContentParser parser) throws IOException {
return EnrichPolicy.PARSER.parse(parser, null);
}
@Override
protected EnrichPolicy createTestInstance() {
return randomEnrichPolicy(randomFrom(XContentType.values()));
}
@Override
protected EnrichPolicy createXContextTestInstance(XContentType xContentType) {
return randomEnrichPolicy(xContentType);
}
static EnrichPolicy randomEnrichPolicy(XContentType xContentType) {
final QueryBuilder queryBuilder;
if (randomBoolean()) {
queryBuilder = new MatchAllQueryBuilder();
} else {
queryBuilder = new TermQueryBuilder(randomAlphaOfLength(4), randomAlphaOfLength(4));
}
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(xContentType, out)) {
XContentBuilder content = queryBuilder.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
content.flush();
EnrichPolicy.QuerySource querySource = new EnrichPolicy.QuerySource(new BytesArray(out.toByteArray()), content.contentType());
return new EnrichPolicy(
randomFrom(EnrichPolicy.SUPPORTED_POLICY_TYPES),
randomBoolean() ? querySource : null,
randomAlphaOfLength(4),
randomAlphaOfLength(4),
Arrays.asList(generateRandomStringArray(8, 4, false, false)),
randomAlphaOfLength(4)
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
protected Writeable.Reader<EnrichPolicy> instanceReader() {
return EnrichPolicy::new;
}
@Override
protected void assertEqualInstances(EnrichPolicy expectedInstance, EnrichPolicy newInstance) {
assertNotSame(expectedInstance, newInstance);
assertEqualPolicies(expectedInstance, newInstance);
}
static void assertEqualPolicies(EnrichPolicy expectedInstance, EnrichPolicy newInstance) {
assertThat(newInstance.getType(), equalTo(expectedInstance.getType()));
if (newInstance.getQuery() != null) {
// testFromXContent, always shuffles the xcontent and then byte wise the query is different, so we check the parsed version:
assertThat(newInstance.getQuery().getQueryAsMap(), equalTo(expectedInstance.getQuery().getQueryAsMap()));
} else {
assertThat(expectedInstance.getQuery(), nullValue());
}
assertThat(newInstance.getIndexPattern(), equalTo(expectedInstance.getIndexPattern()));
assertThat(newInstance.getEnrichKey(), equalTo(expectedInstance.getEnrichKey()));
assertThat(newInstance.getEnrichValues(), equalTo(expectedInstance.getEnrichValues()));
assertThat(newInstance.getSchedule(), equalTo(expectedInstance.getSchedule()));
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class EnrichStoreTests extends ESSingleNodeTestCase {
public void testCrud() throws Exception {
EnrichStore enrichStore = new EnrichStore(getInstanceFromNode(ClusterService.class));
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
enrichStore.putPolicy("my-policy", policy, e -> {
error.set(e);
latch.countDown();
});
latch.await();
assertThat(error.get(), nullValue());
EnrichPolicy result = enrichStore.getPolicy("my-policy");
assertThat(result, equalTo(policy));
}
}