[ML] Make Inference processor field_map and inference_config optional (#59010)

Relaxes the requirement that the inference ingest processor must has a
field_map and inference_config defined even if they are empty.
This commit is contained in:
David Kyle 2020-07-06 11:35:30 +01:00 committed by GitHub
parent 0fc12194bf
commit c651135562
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 271 additions and 144 deletions

View File

@ -3,7 +3,7 @@
[[inference-processor]]
=== {infer-cap} Processor
Uses a pre-trained {dfanalytics} model to infer against the data that is being
Uses a pre-trained {dfanalytics} model to infer against the data that is being
ingested in the pipeline.
@ -14,8 +14,8 @@ ingested in the pipeline.
| Name | Required | Default | Description
| `model_id` | yes | - | (String) The ID of the model to load and infer against.
| `target_field` | no | `ml.inference.<processor_tag>` | (String) Field added to incoming documents to contain results objects.
| `field_map` | yes | - | (Object) Maps the document field names to the known field names of the model. This mapping takes precedence over any default mappings provided in the model configuration.
| `inference_config` | yes | - | (Object) Contains the inference type and its options. There are two types: <<inference-processor-regression-opt,`regression`>> and <<inference-processor-classification-opt,`classification`>>.
| `field_map` | no | If defined the model's default field map | (Object) Maps the document field names to the known field names of the model. This mapping takes precedence over any default mappings provided in the model configuration.
| `inference_config` | no | The default settings defined in the model | (Object) Contains the inference type and its options. There are two types: <<inference-processor-regression-opt,`regression`>> and <<inference-processor-classification-opt,`classification`>>.
include::common-options.asciidoc[]
|======
@ -26,7 +26,9 @@ include::common-options.asciidoc[]
"inference": {
"model_id": "flight_delay_regression-1571767128603",
"target_field": "FlightDelayMin_prediction_infer",
"field_map": {},
"field_map": {
"your_field": "my_field"
},
"inference_config": { "regression": {} }
}
}
@ -91,8 +93,8 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=inference-config-classification
--------------------------------------------------
// NOTCONSOLE
This configuration specifies a `regression` inference and the results are
written to the `my_regression` field contained in the `target_field` results
This configuration specifies a `regression` inference and the results are
written to the `my_regression` field contained in the `target_field` results
object.
@ -110,10 +112,10 @@ object.
--------------------------------------------------
// NOTCONSOLE
This configuration specifies a `classification` inference. The number of
categories for which the predicted probabilities are reported is 2
(`num_top_classes`). The result is written to the `prediction` field and the top
classes to the `probabilities` field. Both fields are contained in the
This configuration specifies a `classification` inference. The number of
categories for which the predicted probabilities are reported is 2
(`num_top_classes`). The result is written to the `prediction` field and the top
classes to the `probabilities` field. Both fields are contained in the
`target_field` results object.
@ -121,8 +123,8 @@ classes to the `probabilities` field. Both fields are contained in the
[[inference-processor-feature-importance]]
==== {feat-imp-cap} object mapping
Update your index mapping of the {feat-imp} result field as you can see below to
get the full benefit of aggregating and searching for
Update your index mapping of the {feat-imp} result field as you can see below to
get the full benefit of aggregating and searching for
{ml-docs}/ml-feature-importance.html[{feat-imp}].
[source,js]
@ -146,7 +148,7 @@ The mapping field name for {feat-imp} is compounded as follows:
`<ml.inference.target_field>`.`<inference.tag>`.`feature_importance`
If `inference.tag` is not provided in the processor definition, it is not part
If `inference.tag` is not provided in the processor definition, it is not part
of the field path. The `<ml.inference.target_field>` defaults to `ml.inference`.
For example, you provide a tag `foo` in the definition as you can see below:
@ -161,7 +163,7 @@ For example, you provide a tag `foo` in the definition as you can see below:
// NOTCONSOLE
The {feat-imp} value is written to the `ml.inference.foo.feature_importance`
The {feat-imp} value is written to the `ml.inference.foo.feature_importance`
field.
You can also specify a target field as follows:
@ -175,5 +177,5 @@ You can also specify a target field as follows:
--------------------------------------------------
// NOTCONSOLE
In this case, {feat-imp} is exposed in the
In this case, {feat-imp} is exposed in the
`my_field.foo.feature_importance` field.

View File

@ -10,7 +10,6 @@ import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
@ -80,7 +79,7 @@ public class InternalInferModelAction extends ActionType<InternalInferModelActio
if (oldConfig instanceof RegressionConfig) {
this.update = RegressionConfigUpdate.fromConfig((RegressionConfig)oldConfig);
} else if (oldConfig instanceof ClassificationConfig) {
this.update = ClassificationConfigUpdate.fromConfig((ClassificationConfig) oldConfig);
this.update = ClassificationConfigUpdate.fromConfig((ClassificationConfig)oldConfig);
} else {
throw new IOException("Unexpected configuration type [" + oldConfig.getName() + "]");
}
@ -117,7 +116,15 @@ public class InternalInferModelAction extends ActionType<InternalInferModelActio
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
out.writeNamedWriteable(update);
} else {
out.writeNamedWriteable(update.toConfig());
if (update instanceof RegressionConfigUpdate || update instanceof ClassificationConfigUpdate) {
out.writeNamedWriteable(update.toConfig());
} else {
// This should never happen there are checks higher up against
// sending config update types added after 7.8 to older nodes
throw new UnsupportedOperationException(
"inference config of type [" + update.getName() +
"] cannot be serialized to node of version [" + out.getVersion() + "]");
}
}
out.writeBoolean(previouslyLicensed);
}
@ -143,7 +150,7 @@ public class InternalInferModelAction extends ActionType<InternalInferModelActio
return "Request{" +
"modelId='" + modelId + '\'' +
", objectsToInfer=" + objectsToInfer +
", update=" + Strings.toString(update) +
", update=" + update +
", previouslyLicensed=" + previouslyLicensed +
'}';
}

View File

@ -15,6 +15,7 @@ import org.elasticsearch.xpack.core.ml.inference.results.RegressionInferenceResu
import org.elasticsearch.xpack.core.ml.inference.results.WarningInferenceResults;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.EmptyConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfig;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.LenientlyParsedInferenceConfig;
@ -130,8 +131,6 @@ public class MlInferenceNamedXContentProvider implements NamedXContentProvider {
ClassificationConfigUpdate::fromXContentStrict));
namedXContent.add(new NamedXContentRegistry.Entry(InferenceConfigUpdate.class, RegressionConfigUpdate.NAME,
RegressionConfigUpdate::fromXContentStrict));
namedXContent.add(new NamedXContentRegistry.Entry(InferenceConfigUpdate.class, ResultsFieldUpdate.NAME,
ResultsFieldUpdate::fromXContent));
// Inference models
namedXContent.add(new NamedXContentRegistry.Entry(InferenceModel.class, Ensemble.NAME, EnsembleInferenceModel::fromXContent));
@ -198,6 +197,10 @@ public class MlInferenceNamedXContentProvider implements NamedXContentProvider {
ClassificationConfigUpdate.NAME.getPreferredName(), ClassificationConfigUpdate::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(InferenceConfigUpdate.class,
RegressionConfigUpdate.NAME.getPreferredName(), RegressionConfigUpdate::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(InferenceConfigUpdate.class,
ResultsFieldUpdate.NAME, ResultsFieldUpdate::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(InferenceConfigUpdate.class,
EmptyConfigUpdate.NAME, EmptyConfigUpdate::new));
return namedWriteables;
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.NamedXContentObject;
import java.io.IOException;
import java.util.HashMap;
@ -24,9 +25,9 @@ import static org.elasticsearch.xpack.core.ml.inference.trainedmodel.Classificat
import static org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig.RESULTS_FIELD;
import static org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig.TOP_CLASSES_RESULTS_FIELD;
public class ClassificationConfigUpdate implements InferenceConfigUpdate {
public class ClassificationConfigUpdate implements InferenceConfigUpdate, NamedXContentObject {
public static final ParseField NAME = new ParseField("classification");
public static final ParseField NAME = ClassificationConfig.NAME;
public static ClassificationConfigUpdate EMPTY_PARAMS =
new ClassificationConfigUpdate(null, null, null, null, null);

View File

@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.inference.trainedmodel;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class EmptyConfigUpdate implements InferenceConfigUpdate {
public static final String NAME = "empty";
public static Version minimumSupportedVersion() {
return Version.V_7_9_0;
}
public EmptyConfigUpdate() {
}
public EmptyConfigUpdate(StreamInput in) {
}
@Override
public String getResultsField() {
return null;
}
@Override
public InferenceConfig apply(InferenceConfig originalConfig) {
return originalConfig;
}
@Override
public InferenceConfig toConfig() {
throw new UnsupportedOperationException("the empty config update cannot be rewritten");
}
@Override
public boolean isSupported(InferenceConfig config) {
return true;
}
@Override
public InferenceConfigUpdate.Builder<? extends InferenceConfigUpdate.Builder<?, ?>, ? extends InferenceConfigUpdate> newBuilder() {
return new Builder();
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
@Override
public boolean equals(Object o) {
return o != null && getClass() == o.getClass();
}
@Override
public int hashCode() {
return EmptyConfigUpdate.class.hashCode();
}
public static class Builder implements InferenceConfigUpdate.Builder<Builder, EmptyConfigUpdate> {
@Override
public Builder setResultsField(String resultsField) {
return this;
}
public EmptyConfigUpdate build() {
return new EmptyConfigUpdate();
}
}
}

View File

@ -9,14 +9,13 @@ import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.core.ml.inference.results.WarningInferenceResults;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.NamedXContentObject;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
public interface InferenceConfigUpdate extends NamedXContentObject, NamedWriteable {
public interface InferenceConfigUpdate extends NamedWriteable {
Set<String> RESERVED_ML_FIELD_NAMES = new HashSet<>(Arrays.asList(
WarningInferenceResults.WARNING.getPreferredName(),
TrainedModelConfig.MODEL_ID.getPreferredName()));
@ -36,6 +35,10 @@ public interface InferenceConfigUpdate extends NamedXContentObject, NamedWriteab
Builder<? extends Builder<?, ?>, ? extends InferenceConfigUpdate> newBuilder();
default String getName() {
return getWriteableName();
}
static void checkFieldUniqueness(String... fieldNames) {
Set<String> duplicatedFieldNames = new HashSet<>();
Set<String> currentFieldNames = new HashSet<>(RESERVED_ML_FIELD_NAMES);
@ -50,7 +53,7 @@ public interface InferenceConfigUpdate extends NamedXContentObject, NamedWriteab
}
}
if (duplicatedFieldNames.isEmpty() == false) {
throw ExceptionsHelper.badRequestException("Cannot apply inference config." +
throw ExceptionsHelper.badRequestException("Invalid inference config." +
" More than one field is configured as {}",
duplicatedFieldNames);
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.NamedXContentObject;
import java.io.IOException;
import java.util.HashMap;
@ -21,9 +22,9 @@ import java.util.Objects;
import static org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig.NUM_TOP_FEATURE_IMPORTANCE_VALUES;
import static org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig.RESULTS_FIELD;
public class RegressionConfigUpdate implements InferenceConfigUpdate {
public class RegressionConfigUpdate implements InferenceConfigUpdate, NamedXContentObject {
public static final ParseField NAME = new ParseField("regression");
public static final ParseField NAME = RegressionConfig.NAME;
public static RegressionConfigUpdate EMPTY_PARAMS = new RegressionConfigUpdate(null, null);

View File

@ -6,39 +6,20 @@
package org.elasticsearch.xpack.core.ml.inference.trainedmodel;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig.RESULTS_FIELD;
/**
* A config update that sets the results field only.
* Supports any type of {@link InferenceConfig}
*/
public class ResultsFieldUpdate implements InferenceConfigUpdate {
public static final ParseField NAME = new ParseField("field_update");
private static final ConstructingObjectParser<ResultsFieldUpdate, Void> PARSER =
new ConstructingObjectParser<>(NAME.getPreferredName(), args -> new ResultsFieldUpdate((String) args[0]));
static {
PARSER.declareString(constructorArg(), RESULTS_FIELD);
}
public static ResultsFieldUpdate fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
public static final String NAME = "field_update";
private final String resultsField;
@ -86,7 +67,7 @@ public class ResultsFieldUpdate implements InferenceConfigUpdate {
@Override
public String getWriteableName() {
return NAME.getPreferredName();
return NAME;
}
@Override
@ -94,19 +75,6 @@ public class ResultsFieldUpdate implements InferenceConfigUpdate {
out.writeString(resultsField);
}
@Override
public String getName() {
return NAME.getPreferredName();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(RESULTS_FIELD.getPreferredName(), resultsField);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -45,11 +45,31 @@ public abstract class AbstractBWCWireSerializationTestCase<T extends Writeable>
for (int runs = 0; runs < NUMBER_OF_TEST_RUNS; runs++) {
T testInstance = createTestInstance();
for (Version bwcVersion : bwcVersions()) {
assertBwcSerialization(testInstance, bwcVersion);
if (isCompatible(testInstance, bwcVersion)) {
assertBwcSerialization(testInstance, bwcVersion);
}
}
}
}
/**
* For the rare occasions where there is no backwards compatibility between
* the specific instance and BWC version override this method to check
* compatibility before asserting the BWC serialization.
*
* The reason for incompatibility may be that earlier node versions are not
* aware of certain features and there is higher level logic to prevent
* streaming to those earlier nodes or the serialization code throws.
* The randomly constructed instance may contain those incompatible features.
*
* @param instance Test instance
* @param version BWC version
* @return True if {@code instance} has wire compatibility with {@code version}
*/
protected boolean isCompatible(T instance, Version version) {
return true;
}
/**
* Assert that instances copied at a particular version are equal. The version is useful
* for sanity checking the backwards compatibility of the wire. It isn't a substitute for

View File

@ -15,11 +15,13 @@ import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConf
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfigTests;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfigUpdateTests;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.EmptyConfigUpdateTests;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfigTests;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfigUpdateTests;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ResultsFieldUpdateTests;
import java.util.ArrayList;
import java.util.List;
@ -49,7 +51,9 @@ public class InternalInferModelActionRequestTests extends AbstractBWCWireSeriali
private static InferenceConfigUpdate randomInferenceConfigUpdate() {
return randomFrom(RegressionConfigUpdateTests.randomRegressionConfigUpdate(),
ClassificationConfigUpdateTests.randomClassificationConfigUpdate());
ClassificationConfigUpdateTests.randomClassificationConfigUpdate(),
ResultsFieldUpdateTests.randomUpdate(),
EmptyConfigUpdateTests.testInstance());
}
private static Map<String, Object> randomMap() {
@ -70,6 +74,16 @@ public class InternalInferModelActionRequestTests extends AbstractBWCWireSeriali
return new NamedWriteableRegistry(entries);
}
@Override
protected boolean isCompatible(Request instance, Version version) {
if (version.before(Version.V_7_8_0)) {
boolean isSupportedType = instance.getUpdate() instanceof RegressionConfigUpdate ||
instance.getUpdate() instanceof ClassificationConfigUpdate;
return isSupportedType;
}
return true;
}
@Override
@SuppressWarnings("unchecked")
protected Request mutateInstanceForVersion(Request instance, Version version) {
@ -90,5 +104,4 @@ public class InternalInferModelActionRequestTests extends AbstractBWCWireSeriali
}
return instance;
}
}

View File

@ -80,7 +80,7 @@ public class ClassificationConfigUpdateTests extends AbstractBWCSerializationTes
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> new ClassificationConfigUpdate(5, "foo", "foo", 1, PredictionFieldType.BOOLEAN));
assertEquals("Cannot apply inference config. More than one field is configured as [foo]", e.getMessage());
assertEquals("Invalid inference config. More than one field is configured as [foo]", e.getMessage());
}
public void testDuplicateWithResultsField() {

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.inference.trainedmodel;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class EmptyConfigUpdateTests extends AbstractWireSerializingTestCase<EmptyConfigUpdate> {
public static EmptyConfigUpdate testInstance() {
return new EmptyConfigUpdate();
}
@Override
protected Writeable.Reader<EmptyConfigUpdate> instanceReader() {
return EmptyConfigUpdate::new;
}
@Override
protected EmptyConfigUpdate createTestInstance() {
return new EmptyConfigUpdate();
}
public void testToConfig() {
expectThrows(UnsupportedOperationException.class, () -> new EmptyConfigUpdate().toConfig());
}
}

View File

@ -65,7 +65,7 @@ public class RegressionConfigUpdateTests extends AbstractBWCSerializationTestCas
public void testInvalidResultFieldNotUnique() {
ElasticsearchStatusException e =
expectThrows(ElasticsearchStatusException.class, () -> new RegressionConfigUpdate("warning", 0));
assertEquals("Cannot apply inference config. More than one field is configured as [warning]", e.getMessage());
assertEquals("Invalid inference config. More than one field is configured as [warning]", e.getMessage());
}
public void testNewBuilder() {

View File

@ -7,19 +7,15 @@
package org.elasticsearch.xpack.core.ml.inference.trainedmodel;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
public class ResultsFieldUpdateTests extends AbstractSerializingTestCase<ResultsFieldUpdate> {
public class ResultsFieldUpdateTests extends AbstractWireSerializingTestCase<ResultsFieldUpdate> {
@Override
protected ResultsFieldUpdate doParseInstance(XContentParser parser) throws IOException {
return ResultsFieldUpdate.fromXContent(parser);
public static ResultsFieldUpdate randomUpdate() {
return new ResultsFieldUpdate(randomAlphaOfLength(4));
}
@Override
@ -29,7 +25,7 @@ public class ResultsFieldUpdateTests extends AbstractSerializingTestCase<Results
@Override
protected ResultsFieldUpdate createTestInstance() {
return new ResultsFieldUpdate(randomAlphaOfLength(4));
return randomUpdate();
}
public void testIsSupported() {

View File

@ -30,6 +30,7 @@ import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.ml.action.InternalInferModelAction;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.EmptyConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfig;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceConfigUpdate;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig;
@ -40,6 +41,7 @@ import org.elasticsearch.xpack.ml.inference.loadingservice.LocalModel;
import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -289,8 +291,22 @@ public class InferenceProcessor extends AbstractProcessor {
LoggingDeprecationHandler.INSTANCE.usedDeprecatedName(null, () -> null, FIELD_MAPPINGS, FIELD_MAP);
}
}
InferenceConfigUpdate inferenceConfig =
inferenceConfigFromMap(ConfigurationUtils.readMap(TYPE, tag, config, INFERENCE_CONFIG));
if (fieldMap == null) {
fieldMap = Collections.emptyMap();
}
InferenceConfigUpdate inferenceConfigUpdate;
Map<String, Object> inferenceConfigMap = ConfigurationUtils.readOptionalMap(TYPE, tag, config, INFERENCE_CONFIG);
if (inferenceConfigMap == null) {
if (minNodeVersion.before(EmptyConfigUpdate.minimumSupportedVersion())) {
// an inference config is required when the empty update is not supported
throw ConfigurationUtils.newConfigurationException(TYPE, tag, INFERENCE_CONFIG, "required property is missing");
}
inferenceConfigUpdate = new EmptyConfigUpdate();
} else {
inferenceConfigUpdate = inferenceConfigUpdateFromMap(inferenceConfigMap);
}
return new InferenceProcessor(client,
auditor,
@ -298,7 +314,7 @@ public class InferenceProcessor extends AbstractProcessor {
description,
targetField,
modelId,
inferenceConfig,
inferenceConfigUpdate,
fieldMap);
}
@ -308,13 +324,13 @@ public class InferenceProcessor extends AbstractProcessor {
this.maxIngestProcessors = maxIngestProcessors;
}
InferenceConfigUpdate inferenceConfigFromMap(Map<String, Object> inferenceConfig) {
ExceptionsHelper.requireNonNull(inferenceConfig, INFERENCE_CONFIG);
if (inferenceConfig.size() != 1) {
InferenceConfigUpdate inferenceConfigUpdateFromMap(Map<String, Object> configMap) {
ExceptionsHelper.requireNonNull(configMap, INFERENCE_CONFIG);
if (configMap.size() != 1) {
throw ExceptionsHelper.badRequestException("{} must be an object with one inference type mapped to an object.",
INFERENCE_CONFIG);
}
Object value = inferenceConfig.values().iterator().next();
Object value = configMap.values().iterator().next();
if ((value instanceof Map<?, ?>) == false) {
throw ExceptionsHelper.badRequestException("{} must be an object with one inference type mapped to an object.",
@ -323,17 +339,17 @@ public class InferenceProcessor extends AbstractProcessor {
@SuppressWarnings("unchecked")
Map<String, Object> valueMap = (Map<String, Object>)value;
if (inferenceConfig.containsKey(ClassificationConfig.NAME.getPreferredName())) {
if (configMap.containsKey(ClassificationConfig.NAME.getPreferredName())) {
checkSupportedVersion(ClassificationConfig.EMPTY_PARAMS);
ClassificationConfigUpdate config = ClassificationConfigUpdate.fromMap(valueMap);
return config;
} else if (inferenceConfig.containsKey(RegressionConfig.NAME.getPreferredName())) {
} else if (configMap.containsKey(RegressionConfig.NAME.getPreferredName())) {
checkSupportedVersion(RegressionConfig.EMPTY_PARAMS);
RegressionConfigUpdate config = RegressionConfigUpdate.fromMap(valueMap);
return config;
} else {
throw ExceptionsHelper.badRequestException("unrecognized inference configuration type {}. Supported types {}",
inferenceConfig.keySet(),
configMap.keySet(),
Arrays.asList(ClassificationConfig.NAME.getPreferredName(), RegressionConfig.NAME.getPreferredName()));
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.ClassificationConfig;
@ -52,7 +51,6 @@ import static org.mockito.Mockito.when;
public class InferenceProcessorFactoryTests extends ESTestCase {
private Client client;
private XPackLicenseState licenseState;
private ClusterService clusterService;
@Before
@ -70,8 +68,6 @@ public class InferenceProcessorFactoryTests extends ESTestCase {
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING)));
clusterService = new ClusterService(settings, clusterSettings, tp);
licenseState = mock(XPackLicenseState.class);
when(licenseState.checkFeature(XPackLicenseState.Feature.MACHINE_LEARNING)).thenReturn(true);
}
public void testNumInferenceProcessors() throws Exception {
@ -211,15 +207,10 @@ public class InferenceProcessorFactoryTests extends ESTestCase {
Collections.singletonMap(RegressionConfig.NAME.getPreferredName(), Collections.emptyMap()));
}};
try {
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, regression);
fail("Should not have successfully created");
} catch (ElasticsearchException ex) {
assertThat(ex.getMessage(),
equalTo("Configuration [regression] requires minimum node version [7.6.0] (current minimum node version [7.5.0]"));
} catch (Exception ex) {
fail(ex.getMessage());
}
ElasticsearchException ex = expectThrows(ElasticsearchException.class,
() -> processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, regression));
assertThat(ex.getMessage(),
equalTo("Configuration [regression] requires minimum node version [7.6.0] (current minimum node version [7.5.0]"));
Map<String, Object> classification = new HashMap<String, Object>() {{
put(InferenceProcessor.FIELD_MAP, Collections.emptyMap());
@ -229,15 +220,26 @@ public class InferenceProcessorFactoryTests extends ESTestCase {
Collections.singletonMap(ClassificationConfig.NUM_TOP_CLASSES.getPreferredName(), 1)));
}};
try {
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, classification);
fail("Should not have successfully created");
} catch (ElasticsearchException ex) {
assertThat(ex.getMessage(),
equalTo("Configuration [classification] requires minimum node version [7.6.0] (current minimum node version [7.5.0]"));
} catch (Exception ex) {
fail(ex.getMessage());
}
ex = expectThrows(ElasticsearchException.class,
() -> processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, classification));
assertThat(ex.getMessage(),
equalTo("Configuration [classification] requires minimum node version [7.6.0] (current minimum node version [7.5.0]"));
}
public void testCreateProcessorWithEmptyConfigNotSupportedOnOldNode() throws IOException {
InferenceProcessor.Factory processorFactory = new InferenceProcessor.Factory(client,
clusterService,
Settings.EMPTY);
processorFactory.accept(builderClusterStateWithModelReferences(Version.V_7_5_0, "model1"));
Map<String, Object> minimalConfig = new HashMap<String, Object>() {{
put(InferenceProcessor.MODEL_ID, "my_model");
put(InferenceProcessor.TARGET_FIELD, "result");
}};
ElasticsearchException ex = expectThrows(ElasticsearchException.class,
() -> processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, minimalConfig));
assertThat(ex.getMessage(), equalTo("[inference_config] required property is missing"));
}
public void testCreateProcessor() {
@ -253,11 +255,8 @@ public class InferenceProcessorFactoryTests extends ESTestCase {
Collections.singletonMap(RegressionConfig.NAME.getPreferredName(), Collections.emptyMap()));
}};
try {
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, regression);
} catch (Exception ex) {
fail(ex.getMessage());
}
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, regression);
Map<String, Object> classification = new HashMap<String, Object>() {{
put(InferenceProcessor.FIELD_MAP, Collections.emptyMap());
@ -267,11 +266,14 @@ public class InferenceProcessorFactoryTests extends ESTestCase {
Collections.singletonMap(ClassificationConfig.NUM_TOP_CLASSES.getPreferredName(), 1)));
}};
try {
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, classification);
} catch (Exception ex) {
fail(ex.getMessage());
}
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, classification);
Map<String, Object> mininmal = new HashMap<String, Object>() {{
put(InferenceProcessor.MODEL_ID, "my_model");
put(InferenceProcessor.TARGET_FIELD, "result");
}};
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, mininmal);
}
public void testCreateProcessorWithDuplicateFields() {
@ -287,13 +289,10 @@ public class InferenceProcessorFactoryTests extends ESTestCase {
Collections.singletonMap(RegressionConfig.RESULTS_FIELD.getPreferredName(), "warning")));
}};
try {
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, regression);
fail("should not have succeeded creating with duplicate fields");
} catch (Exception ex) {
assertThat(ex.getMessage(), equalTo("Cannot apply inference config. " +
"More than one field is configured as [warning]"));
}
Exception ex = expectThrows(Exception.class, () ->
processorFactory.create(Collections.emptyMap(), "my_inference_processor", null, regression));
assertThat(ex.getMessage(), equalTo("Invalid inference config. " +
"More than one field is configured as [warning]"));
}
private static ClusterState buildClusterState(Metadata metadata) {

View File

@ -68,22 +68,6 @@ setup:
}
]
}
- do:
catch: /\[inference_config\] required property is missing/
ingest.put_pipeline:
id: "regression-model-pipeline"
body: >
{
"processors": [
{
"inference" : {
"model_id" : "a-perfect-regression-model",
"target_field": "regression_field",
"field_map": {}
}
}
]
}
---
"Test create processor with deprecated fields":
- skip:
@ -124,7 +108,6 @@ setup:
{
"inference" : {
"model_id" : "a-perfect-regression-model",
"inference_config": {"regression": {}},
"target_field": "regression_field",
"field_map": {}
}
@ -144,8 +127,7 @@ setup:
"inference" : {
"model_id" : "a-perfect-regression-model",
"inference_config": {"regression": {"results_field": "value"}},
"target_field": "regression_field",
"field_map": {}
"target_field": "regression_field"
}
}
]},