diff --git a/x-pack/plugin/enrich/build.gradle b/x-pack/plugin/enrich/build.gradle index 4d047e4d684..d8847dab3c9 100644 --- a/x-pack/plugin/enrich/build.gradle +++ b/x-pack/plugin/enrich/build.gradle @@ -22,4 +22,3 @@ run { // No tests yet: integTest.enabled = false -test.enabled = false diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichMetadata.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichMetadata.java new file mode 100644 index 00000000000..6548fc6514b --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichMetadata.java @@ -0,0 +1,123 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.AbstractNamedDiffable; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.XPackPlugin; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Encapsulates enrich policies as custom metadata inside cluster state. + */ +public final class EnrichMetadata extends AbstractNamedDiffable implements XPackPlugin.XPackMetaDataCustom { + + static final String TYPE = "enrich"; + + static final ParseField POLICIES = new ParseField("policies"); + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "enrich_metadata", + args -> new EnrichMetadata((Map) args[0]) + ); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { + Map patterns = new HashMap<>(); + String fieldName = null; + for (XContentParser.Token token = p.nextToken(); token != XContentParser.Token.END_OBJECT; token = p.nextToken()) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = p.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + patterns.put(fieldName, EnrichPolicy.PARSER.parse(p, c)); + } else { + throw new ElasticsearchParseException("unexpected token [" + token + "]"); + } + } + return patterns; + }, POLICIES); + } + + public static EnrichMetadata fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final Map policies; + + public EnrichMetadata(StreamInput in) throws IOException { + this(in.readMap(StreamInput::readString, EnrichPolicy::new)); + } + + public EnrichMetadata(Map policies) { + this.policies = Collections.unmodifiableMap(policies); + } + + public Map getPolicies() { + return policies; + } + + @Override + public EnumSet context() { + return MetaData.ALL_CONTEXTS; + } + + @Override + public Version getMinimalSupportedVersion() { + // NO RELEASE: change when merging enrich & enrich-7.x into master and 7.x respectively: + return Version.V_7_1_0; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(policies, StreamOutput::writeString, (out1, value) -> value.writeTo(out1)); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(POLICIES.getPreferredName()); + for (Map.Entry entry : policies.entrySet()) { + builder.startObject(entry.getKey()); + builder.value(entry.getValue()); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + EnrichMetadata that = (EnrichMetadata) o; + return policies.equals(that.policies); + } + + @Override + public int hashCode() { + return Objects.hash(policies); + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index a3b4cb85a19..1180b13c073 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -5,11 +5,24 @@ */ package org.elasticsearch.xpack.enrich; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; public class EnrichPlugin extends Plugin implements IngestPlugin { @@ -18,4 +31,28 @@ public class EnrichPlugin extends Plugin implements IngestPlugin { public Map getProcessors(Processor.Parameters parameters) { return Collections.emptyMap(); } + + @Override + public Collection createComponents(Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry) { + return Collections.singleton(new EnrichStore(clusterService)); + } + + @Override + public List getNamedWriteables() { + return Collections.singletonList(new NamedWriteableRegistry.Entry(MetaData.Custom.class, EnrichMetadata.TYPE, + EnrichMetadata::new)); + } + + public List getNamedXContent() { + return Collections.singletonList(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(EnrichMetadata.TYPE), + EnrichMetadata::fromXContent)); + } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicy.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicy.java new file mode 100644 index 00000000000..ceb3be91af1 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicy.java @@ -0,0 +1,221 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Represents an enrich policy including its configuration. + */ +public final class EnrichPolicy implements Writeable, ToXContentFragment { + + static final String EXACT_MATCH_TYPE = "exact_match"; + static final String[] SUPPORTED_POLICY_TYPES = new String[]{EXACT_MATCH_TYPE}; + + static final ParseField TYPE = new ParseField("type"); + static final ParseField QUERY = new ParseField("query"); + static final ParseField INDEX_PATTERN = new ParseField("index_pattern"); + static final ParseField ENRICH_KEY = new ParseField("enrich_key"); + static final ParseField ENRICH_VALUES = new ParseField("enrich_values"); + static final ParseField SCHEDULE = new ParseField("schedule"); + + @SuppressWarnings("unchecked") + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("policy", + args -> { + return new EnrichPolicy( + (String) args[0], + (QuerySource) args[1], + (String) args[2], + (String) args[3], + (List) args[4], + (String) args[5] + ); + } + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE); + PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> { + XContentBuilder contentBuilder = XContentBuilder.builder(p.contentType().xContent()); + contentBuilder.generator().copyCurrentStructure(p); + return new QuerySource(BytesReference.bytes(contentBuilder), contentBuilder.contentType()); + }, QUERY); + PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_PATTERN); + PARSER.declareString(ConstructingObjectParser.constructorArg(), ENRICH_KEY); + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_VALUES); + PARSER.declareString(ConstructingObjectParser.constructorArg(), SCHEDULE); + } + + private final String type; + private final QuerySource query; + private final String indexPattern; + private final String enrichKey; + private final List enrichValues; + private final String schedule; + + public EnrichPolicy(StreamInput in) throws IOException { + this( + in.readString(), + in.readOptionalWriteable(QuerySource::new), + in.readString(), + in.readString(), + in.readStringList(), + in.readString() + ); + } + + public EnrichPolicy(String type, + QuerySource query, + String indexPattern, + String enrichKey, + List enrichValues, + String schedule) { + this.type = type; + this.query= query; + this.schedule = schedule; + this.indexPattern = indexPattern; + this.enrichKey = enrichKey; + this.enrichValues = enrichValues; + } + + public String getType() { + return type; + } + + public QuerySource getQuery() { + return query; + } + + public String getIndexPattern() { + return indexPattern; + } + + public String getEnrichKey() { + return enrichKey; + } + + public List getEnrichValues() { + return enrichValues; + } + + public String getSchedule() { + return schedule; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(type); + out.writeOptionalWriteable(query); + out.writeString(indexPattern); + out.writeString(enrichKey); + out.writeStringCollection(enrichValues); + out.writeString(schedule); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(TYPE.getPreferredName(), type); + if (query != null) { + builder.field(QUERY.getPreferredName(), query.getQueryAsMap()); + } + builder.field(INDEX_PATTERN.getPreferredName(), indexPattern); + builder.field(ENRICH_KEY.getPreferredName(), enrichKey); + builder.array(ENRICH_VALUES.getPreferredName(), enrichValues.toArray(new String[0])); + builder.field(SCHEDULE.getPreferredName(), schedule); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + EnrichPolicy policy = (EnrichPolicy) o; + return type.equals(policy.type) && + Objects.equals(query, policy.query) && + indexPattern.equals(policy.indexPattern) && + enrichKey.equals(policy.enrichKey) && + enrichValues.equals(policy.enrichValues) && + schedule.equals(policy.schedule); + } + + @Override + public int hashCode() { + return Objects.hash( + type, + query, + indexPattern, + enrichKey, + enrichValues, + schedule + ); + } + + public String toString() { + return Strings.toString(this); + } + + public static class QuerySource implements Writeable { + + private final BytesReference query; + private final XContentType contentType; + + QuerySource(StreamInput in) throws IOException { + this(in.readBytesReference(), in.readEnum(XContentType.class)); + } + + public QuerySource(BytesReference query, XContentType contentType) { + this.query = query; + this.contentType = contentType; + } + + public BytesReference getQuery() { + return query; + } + + public Map getQueryAsMap() { + return XContentHelper.convertToMap(query, true, contentType).v2(); + } + + public XContentType getContentType() { + return contentType; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBytesReference(query); + out.writeEnum(contentType); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + QuerySource that = (QuerySource) o; + return query.equals(that.query) && + contentType == that.contentType; + } + + @Override + public int hashCode() { + return Objects.hash(query, contentType); + } + } +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java new file mode 100644 index 00000000000..918dd9c8fd4 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +/** + * A components that provides access and stores an enrich policy. + */ +public final class EnrichStore { + + private final ClusterService clusterService; + + EnrichStore(ClusterService clusterService) { + this.clusterService = clusterService; + } + + /** + * Adds a new enrich policy or overwrites an existing policy if there is already a policy with the same name. + * This method can only be invoked on the elected master node. + * + * @param name The unique name of the policy + * @param policy The policy to store + * @param handler The handler that gets invoked if policy has been stored or a failure has occurred. + */ + public void putPolicy(String name, EnrichPolicy policy, Consumer handler) { + assert clusterService.localNode().isMasterNode(); + + // TODO: add validation + + final Map policies; + final EnrichMetadata enrichMetadata = clusterService.state().metaData().custom(EnrichMetadata.TYPE); + if (enrichMetadata != null) { + // Make a copy, because policies map inside custom metadata is read only: + policies = new HashMap<>(enrichMetadata.getPolicies()); + } else { + policies = new HashMap<>(); + } + policies.put(name, policy); + clusterService.submitStateUpdateTask("update-enrich-policy", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + MetaData metaData = MetaData.builder(currentState.metaData()) + .putCustom(EnrichMetadata.TYPE, new EnrichMetadata(policies)) + .build(); + return ClusterState.builder(currentState) + .metaData(metaData) + .build(); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + handler.accept(null); + } + + @Override + public void onFailure(String source, Exception e) { + handler.accept(e); + } + }); + } + + /** + * Gets an enrich policy for the provided name if exists or otherwise returns null. + * + * @param name The name of the policy to fetch + * @return enrich policy if exists or null otherwise + */ + public EnrichPolicy getPolicy(String name) { + EnrichMetadata enrichMetadata = clusterService.state().metaData().custom(EnrichMetadata.TYPE); + if (enrichMetadata == null) { + return null; + } + return enrichMetadata.getPolicies().get(name); + } + +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMetadataTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMetadataTests.java new file mode 100644 index 00000000000..93efcd35b47 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichMetadataTests.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy; +import static org.hamcrest.Matchers.equalTo; + +public class EnrichMetadataTests extends AbstractSerializingTestCase { + + @Override + protected EnrichMetadata doParseInstance(XContentParser parser) throws IOException { + return EnrichMetadata.fromXContent(parser); + } + + @Override + protected EnrichMetadata createTestInstance() { + return randomEnrichMetadata(randomFrom(XContentType.values())); + } + + @Override + protected EnrichMetadata createXContextTestInstance(XContentType xContentType) { + return randomEnrichMetadata(xContentType); + } + + private static EnrichMetadata randomEnrichMetadata(XContentType xContentType) { + int numPolicies = randomIntBetween(8, 64); + Map policies = new HashMap<>(numPolicies); + for (int i = 0; i < numPolicies; i++) { + EnrichPolicy policy = randomEnrichPolicy(xContentType); + policies.put(randomAlphaOfLength(8), policy); + } + return new EnrichMetadata(policies); + } + + @Override + protected Writeable.Reader instanceReader() { + return EnrichMetadata::new; + } + + @Override + protected void assertEqualInstances(EnrichMetadata expectedInstance, EnrichMetadata newInstance) { + assertNotSame(expectedInstance, newInstance); + assertThat(newInstance.getPolicies().size(), equalTo(expectedInstance.getPolicies().size())); + for (Map.Entry entry : newInstance.getPolicies().entrySet()) { + EnrichPolicy actual = entry.getValue(); + EnrichPolicy expected = expectedInstance.getPolicies().get(entry.getKey()); + EnrichPolicyTests.assertEqualPolicies(expected, actual); + } + } +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyTests.java new file mode 100644 index 00000000000..3cfd7ee60d8 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class EnrichPolicyTests extends AbstractSerializingTestCase { + + @Override + protected EnrichPolicy doParseInstance(XContentParser parser) throws IOException { + return EnrichPolicy.PARSER.parse(parser, null); + } + + @Override + protected EnrichPolicy createTestInstance() { + return randomEnrichPolicy(randomFrom(XContentType.values())); + } + + @Override + protected EnrichPolicy createXContextTestInstance(XContentType xContentType) { + return randomEnrichPolicy(xContentType); + } + + static EnrichPolicy randomEnrichPolicy(XContentType xContentType) { + final QueryBuilder queryBuilder; + if (randomBoolean()) { + queryBuilder = new MatchAllQueryBuilder(); + } else { + queryBuilder = new TermQueryBuilder(randomAlphaOfLength(4), randomAlphaOfLength(4)); + } + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(xContentType, out)) { + XContentBuilder content = queryBuilder.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + content.flush(); + EnrichPolicy.QuerySource querySource = new EnrichPolicy.QuerySource(new BytesArray(out.toByteArray()), content.contentType()); + return new EnrichPolicy( + randomFrom(EnrichPolicy.SUPPORTED_POLICY_TYPES), + randomBoolean() ? querySource : null, + randomAlphaOfLength(4), + randomAlphaOfLength(4), + Arrays.asList(generateRandomStringArray(8, 4, false, false)), + randomAlphaOfLength(4) + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + } + + @Override + protected Writeable.Reader instanceReader() { + return EnrichPolicy::new; + } + + @Override + protected void assertEqualInstances(EnrichPolicy expectedInstance, EnrichPolicy newInstance) { + assertNotSame(expectedInstance, newInstance); + assertEqualPolicies(expectedInstance, newInstance); + } + + static void assertEqualPolicies(EnrichPolicy expectedInstance, EnrichPolicy newInstance) { + assertThat(newInstance.getType(), equalTo(expectedInstance.getType())); + if (newInstance.getQuery() != null) { + // testFromXContent, always shuffles the xcontent and then byte wise the query is different, so we check the parsed version: + assertThat(newInstance.getQuery().getQueryAsMap(), equalTo(expectedInstance.getQuery().getQueryAsMap())); + } else { + assertThat(expectedInstance.getQuery(), nullValue()); + } + assertThat(newInstance.getIndexPattern(), equalTo(expectedInstance.getIndexPattern())); + assertThat(newInstance.getEnrichKey(), equalTo(expectedInstance.getEnrichKey())); + assertThat(newInstance.getEnrichValues(), equalTo(expectedInstance.getEnrichValues())); + assertThat(newInstance.getSchedule(), equalTo(expectedInstance.getSchedule())); + } +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java new file mode 100644 index 00000000000..43dfc31ceb9 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class EnrichStoreTests extends ESSingleNodeTestCase { + + public void testCrud() throws Exception { + EnrichStore enrichStore = new EnrichStore(getInstanceFromNode(ClusterService.class)); + EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + enrichStore.putPolicy("my-policy", policy, e -> { + error.set(e); + latch.countDown(); + }); + latch.await(); + assertThat(error.get(), nullValue()); + + EnrichPolicy result = enrichStore.getPolicy("my-policy"); + assertThat(result, equalTo(policy)); + } + +}