Add the cluster version to enrich policies (#45021)

Adds the Elasticsearch version as a field on the EnrichPolicy object
This commit is contained in:
James Baiera 2019-09-20 11:39:09 -04:00
parent 618fb31be8
commit a349b22273
3 changed files with 62 additions and 14 deletions

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.core.enrich; package org.elasticsearch.xpack.core.enrich;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -13,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -40,6 +42,7 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
private static final ParseField INDICES = new ParseField("indices"); private static final ParseField INDICES = new ParseField("indices");
private static final ParseField MATCH_FIELD = new ParseField("match_field"); private static final ParseField MATCH_FIELD = new ParseField("match_field");
private static final ParseField ENRICH_FIELDS = new ParseField("enrich_fields"); private static final ParseField ENRICH_FIELDS = new ParseField("enrich_fields");
private static final ParseField ELASTICSEARCH_VERSION = new ParseField("elasticsearch_version");
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static final ConstructingObjectParser<EnrichPolicy, String> PARSER = new ConstructingObjectParser<>( private static final ConstructingObjectParser<EnrichPolicy, String> PARSER = new ConstructingObjectParser<>(
@ -50,15 +53,16 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
(QuerySource) args[0], (QuerySource) args[0],
(List<String>) args[1], (List<String>) args[1],
(String) args[2], (String) args[2],
(List<String>) args[3] (List<String>) args[3],
(Version) args[4]
) )
); );
static { static {
declareParserOptions(PARSER); declareCommonConstructorParsingOptions(PARSER);
} }
private static void declareParserOptions(ConstructingObjectParser<?, ?> parser) { private static <T> void declareCommonConstructorParsingOptions(ConstructingObjectParser<T, ?> parser) {
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> { parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
XContentBuilder contentBuilder = XContentBuilder.builder(p.contentType().xContent()); XContentBuilder contentBuilder = XContentBuilder.builder(p.contentType().xContent());
contentBuilder.generator().copyCurrentStructure(p); contentBuilder.generator().copyCurrentStructure(p);
@ -67,6 +71,8 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES); parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES);
parser.declareString(ConstructingObjectParser.constructorArg(), MATCH_FIELD); parser.declareString(ConstructingObjectParser.constructorArg(), MATCH_FIELD);
parser.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_FIELDS); parser.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_FIELDS);
parser.declareField(ConstructingObjectParser.optionalConstructorArg(), ((p, c) -> Version.fromString(p.text())),
ELASTICSEARCH_VERSION, ValueType.STRING);
} }
public static EnrichPolicy fromXContent(XContentParser parser) throws IOException { public static EnrichPolicy fromXContent(XContentParser parser) throws IOException {
@ -95,6 +101,7 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
private final List<String> indices; private final List<String> indices;
private final String matchField; private final String matchField;
private final List<String> enrichFields; private final List<String> enrichFields;
private final Version elasticsearchVersion;
public EnrichPolicy(StreamInput in) throws IOException { public EnrichPolicy(StreamInput in) throws IOException {
this( this(
@ -102,7 +109,8 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
in.readOptionalWriteable(QuerySource::new), in.readOptionalWriteable(QuerySource::new),
in.readStringList(), in.readStringList(),
in.readString(), in.readString(),
in.readStringList() in.readStringList(),
Version.readVersion(in)
); );
} }
@ -111,11 +119,21 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
List<String> indices, List<String> indices,
String matchField, String matchField,
List<String> enrichFields) { List<String> enrichFields) {
this(type, query, indices, matchField, enrichFields, Version.CURRENT);
}
public EnrichPolicy(String type,
QuerySource query,
List<String> indices,
String matchField,
List<String> enrichFields,
Version elasticsearchVersion) {
this.type = type; this.type = type;
this.query= query; this.query = query;
this.indices = indices; this.indices = indices;
this.matchField = matchField; this.matchField = matchField;
this.enrichFields = enrichFields; this.enrichFields = enrichFields;
this.elasticsearchVersion = elasticsearchVersion != null ? elasticsearchVersion : Version.CURRENT;
} }
public String getType() { public String getType() {
@ -138,6 +156,10 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
return enrichFields; return enrichFields;
} }
public Version getElasticsearchVersion() {
return elasticsearchVersion;
}
public static String getBaseName(String policyName) { public static String getBaseName(String policyName) {
return ENRICH_INDEX_NAME_BASE + policyName; return ENRICH_INDEX_NAME_BASE + policyName;
} }
@ -149,25 +171,29 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
out.writeStringCollection(indices); out.writeStringCollection(indices);
out.writeString(matchField); out.writeString(matchField);
out.writeStringCollection(enrichFields); out.writeStringCollection(enrichFields);
Version.writeVersion(elasticsearchVersion, out);
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(type); builder.startObject(type);
{ {
toInnerXContent(builder); toInnerXContent(builder, params);
} }
builder.endObject(); builder.endObject();
return builder; return builder;
} }
private void toInnerXContent(XContentBuilder builder) throws IOException { private void toInnerXContent(XContentBuilder builder, Params params) throws IOException {
if (query != null) { if (query != null) {
builder.field(QUERY.getPreferredName(), query.getQueryAsMap()); builder.field(QUERY.getPreferredName(), query.getQueryAsMap());
} }
builder.array(INDICES.getPreferredName(), indices.toArray(new String[0])); builder.array(INDICES.getPreferredName(), indices.toArray(new String[0]));
builder.field(MATCH_FIELD.getPreferredName(), matchField); builder.field(MATCH_FIELD.getPreferredName(), matchField);
builder.array(ENRICH_FIELDS.getPreferredName(), enrichFields.toArray(new String[0])); builder.array(ENRICH_FIELDS.getPreferredName(), enrichFields.toArray(new String[0]));
if (params.paramAsBoolean("include_version", false) && elasticsearchVersion != null) {
builder.field(ELASTICSEARCH_VERSION.getPreferredName(), elasticsearchVersion.toString());
}
} }
@Override @Override
@ -179,7 +205,8 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
Objects.equals(query, policy.query) && Objects.equals(query, policy.query) &&
indices.equals(policy.indices) && indices.equals(policy.indices) &&
matchField.equals(policy.matchField) && matchField.equals(policy.matchField) &&
enrichFields.equals(policy.enrichFields); enrichFields.equals(policy.enrichFields) &&
elasticsearchVersion.equals(policy.elasticsearchVersion);
} }
@Override @Override
@ -189,7 +216,8 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
query, query,
indices, indices,
matchField, matchField,
enrichFields enrichFields,
elasticsearchVersion
); );
} }
@ -257,13 +285,14 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
(QuerySource) args[1], (QuerySource) args[1],
(List<String>) args[2], (List<String>) args[2],
(String) args[3], (String) args[3],
(List<String>) args[4]) (List<String>) args[4],
(Version) args[5])
) )
); );
static { static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME); PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME);
declareParserOptions(PARSER); declareCommonConstructorParsingOptions(PARSER);
} }
private final String name; private final String name;
@ -299,7 +328,7 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
builder.startObject(policy.type); builder.startObject(policy.type);
{ {
builder.field(NAME.getPreferredName(), name); builder.field(NAME.getPreferredName(), name);
policy.toInnerXContent(builder); policy.toInnerXContent(builder, params);
} }
builder.endObject(); builder.endObject();
builder.endObject(); builder.endObject();

View File

@ -5,8 +5,9 @@
*/ */
package org.elasticsearch.xpack.core.enrich.action; package org.elasticsearch.xpack.core.enrich.action;
import org.elasticsearch.action.ActionType; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -37,6 +38,10 @@ public class PutEnrichPolicyAction extends ActionType<AcknowledgedResponse> {
public Request(String name, EnrichPolicy policy) { public Request(String name, EnrichPolicy policy) {
this.name = Objects.requireNonNull(name, "name cannot be null"); this.name = Objects.requireNonNull(name, "name cannot be null");
if (!Version.CURRENT.equals(policy.getElasticsearchVersion())) {
throw new IllegalArgumentException("Cannot set [version_created] field on enrich policy [" + name +
"]. Found [" + policy.getElasticsearchVersion() + "]");
}
this.policy = policy; this.policy = policy;
} }

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -54,12 +55,25 @@ public final class EnrichStore {
} }
// TODO: add policy validation // TODO: add policy validation
final EnrichPolicy finalPolicy;
if (policy.getElasticsearchVersion() == null) {
finalPolicy = new EnrichPolicy(
policy.getType(),
policy.getQuery(),
policy.getIndices(),
policy.getMatchField(),
policy.getEnrichFields(),
Version.CURRENT
);
} else {
finalPolicy = policy;
}
updateClusterState(clusterService, handler, current -> { updateClusterState(clusterService, handler, current -> {
final Map<String, EnrichPolicy> policies = getPolicies(current); final Map<String, EnrichPolicy> policies = getPolicies(current);
if (policies.get(name) != null) { if (policies.get(name) != null) {
throw new ResourceAlreadyExistsException("policy [{}] already exists", name); throw new ResourceAlreadyExistsException("policy [{}] already exists", name);
} }
policies.put(name, policy); policies.put(name, finalPolicy);
return policies; return policies;
}); });
} }