Make ModelDebugOutput a result type (elastic/elasticsearch#484)

* Make ModelDebugOutput a result type

* Delete unused ElasticsearchBatchedModelDebugOutputIterator

* Add result_type field to ModelDebugOutput

* Address review comments

Original commit: elastic/x-pack-elasticsearch@a48e4cd946
This commit is contained in:
David Kyle 2016-12-08 15:05:06 +00:00 committed by GitHub
parent 581b1be217
commit a55944b284
8 changed files with 48 additions and 158 deletions

View File

@ -1,42 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
import java.io.IOException;
class ElasticsearchBatchedModelDebugOutputIterator extends ElasticsearchBatchedDocumentsIterator<ModelDebugOutput> {
public ElasticsearchBatchedModelDebugOutputIterator(Client client, String jobId, ParseFieldMatcher parserFieldMatcher) {
super(client, JobResultsPersister.getJobIndexName(jobId), parserFieldMatcher);
}
@Override
protected String getType() {
return ModelDebugOutput.TYPE.getPreferredName();
}
@Override
protected ModelDebugOutput map(SearchHit hit) {
BytesReference source = hit.getSourceRef();
XContentParser parser;
try {
parser = XContentFactory.xContent(source).createParser(source);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parser model debug output", e);
}
ModelDebugOutput result = ModelDebugOutput.PARSER.apply(parser, () -> parseFieldMatcher);
result.setId(hit.getId());
return result;
}
}

View File

@ -197,6 +197,20 @@ public class ElasticsearchMappings {
.field(TYPE, DOUBLE) .field(TYPE, DOUBLE)
.endObject() .endObject()
.endObject() .endObject()
.endObject()
// Model Debug Output
.startObject(ModelDebugOutput.DEBUG_FEATURE.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ModelDebugOutput.DEBUG_LOWER.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ModelDebugOutput.DEBUG_UPPER.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ModelDebugOutput.DEBUG_MEDIAN.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject(); .endObject();
addAnomalyRecordFieldsToMapping(builder); addAnomalyRecordFieldsToMapping(builder);
@ -612,67 +626,6 @@ public class ElasticsearchMappings {
return builder; return builder;
} }
/**
* Mapping for model debug output
*
* @param termFieldNames Optionally, other field names to include in the
* mappings. Pass <code>null</code> if not required.
*/
public static XContentBuilder modelDebugOutputMapping(Collection<String> termFieldNames) throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(ModelDebugOutput.TYPE.getPreferredName())
.startObject(ALL)
.field(ANALYZER, WHITESPACE)
.endObject()
.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ES_TIMESTAMP)
.field(TYPE, DATE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ModelDebugOutput.PARTITION_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ModelDebugOutput.OVER_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ModelDebugOutput.BY_FIELD_VALUE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(ModelDebugOutput.DEBUG_FEATURE.getPreferredName())
.field(TYPE, KEYWORD).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ModelDebugOutput.DEBUG_LOWER.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ModelDebugOutput.DEBUG_UPPER.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ModelDebugOutput.DEBUG_MEDIAN.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject()
.startObject(ModelDebugOutput.ACTUAL.getPreferredName())
.field(TYPE, DOUBLE).field(INCLUDE_IN_ALL, false)
.endObject();
if (termFieldNames != null) {
ElasticsearchDotNotationReverser reverser = new ElasticsearchDotNotationReverser();
for (String fieldName : termFieldNames) {
reverser.add(fieldName, "");
}
for (Map.Entry<String, Object> entry : reverser.getMappingsMap().entrySet()) {
builder.field(entry.getKey(), entry.getValue());
}
}
return builder
.endObject()
.endObject()
.endObject();
}
/** /**
* The Elasticsearch mappings for the usage documents * The Elasticsearch mappings for the usage documents
*/ */

View File

@ -187,7 +187,6 @@ public class JobProvider {
XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping(); XContentBuilder quantilesMapping = ElasticsearchMappings.quantilesMapping();
XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping(); XContentBuilder modelStateMapping = ElasticsearchMappings.modelStateMapping();
XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping(); XContentBuilder modelSnapshotMapping = ElasticsearchMappings.modelSnapshotMapping();
XContentBuilder modelDebugMapping = ElasticsearchMappings.modelDebugOutputMapping(termFields);
XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping(); XContentBuilder dataCountsMapping = ElasticsearchMappings.dataCountsMapping();
String jobId = job.getId(); String jobId = job.getId();
@ -200,7 +199,6 @@ public class JobProvider {
createIndexRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping); createIndexRequest.mapping(Quantiles.TYPE.getPreferredName(), quantilesMapping);
createIndexRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping); createIndexRequest.mapping(ModelState.TYPE.getPreferredName(), modelStateMapping);
createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping); createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
createIndexRequest.mapping(ModelDebugOutput.TYPE.getPreferredName(), modelDebugMapping);
createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping); createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() { client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@ -853,16 +851,8 @@ public class JobProvider {
} }
/** /**
* Returns a {@link BatchedDocumentsIterator} that allows querying * Get the persisted quantiles state for the job
* and iterating over a number of ModelDebugOutputs of the given job
*
* @param jobId the id of the job for which model snapshots are requested
* @return a model snapshot {@link BatchedDocumentsIterator}
*/ */
public BatchedDocumentsIterator<ModelDebugOutput> newBatchedModelDebugOutputIterator(String jobId) {
return new ElasticsearchBatchedModelDebugOutputIterator(client, jobId, parseFieldMatcher);
}
public Optional<Quantiles> getQuantiles(String jobId) { public Optional<Quantiles> getQuantiles(String jobId) {
String indexName = JobResultsPersister.getJobIndexName(jobId); String indexName = JobResultsPersister.getJobIndexName(jobId);
try { try {

View File

@ -230,8 +230,8 @@ public class JobResultsPersister extends AbstractComponent {
* @param category The category to be persisted * @param category The category to be persisted
*/ */
public void persistCategoryDefinition(CategoryDefinition category) { public void persistCategoryDefinition(CategoryDefinition category) {
Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE::getPreferredName, Persistable persistable = new Persistable(category.getJobId(), category, CategoryDefinition.TYPE.getPreferredName(),
() -> String.valueOf(category.getCategoryId()), () -> toXContentBuilder(category)); String.valueOf(category.getCategoryId()));
persistable.persist(); persistable.persist();
// Don't commit as we expect masses of these updates and they're not // Don't commit as we expect masses of these updates and they're not
// read again by this process // read again by this process
@ -241,8 +241,8 @@ public class JobResultsPersister extends AbstractComponent {
* Persist the quantiles * Persist the quantiles
*/ */
public void persistQuantiles(Quantiles quantiles) { public void persistQuantiles(Quantiles quantiles) {
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE::getPreferredName, Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE.getPreferredName(),
() -> Quantiles.QUANTILES_ID, () -> toXContentBuilder(quantiles)); Quantiles.QUANTILES_ID);
if (persistable.persist()) { if (persistable.persist()) {
// Refresh the index when persisting quantiles so that previously // Refresh the index when persisting quantiles so that previously
// persisted results will be available for searching. Do this using the // persisted results will be available for searching. Do this using the
@ -257,8 +257,8 @@ public class JobResultsPersister extends AbstractComponent {
* Persist a model snapshot description * Persist a model snapshot description
*/ */
public void persistModelSnapshot(ModelSnapshot modelSnapshot) { public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE::getPreferredName, Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE.getPreferredName(),
modelSnapshot::getSnapshotId, () -> toXContentBuilder(modelSnapshot)); modelSnapshot.getSnapshotId());
persistable.persist(); persistable.persist();
} }
@ -268,11 +268,10 @@ public class JobResultsPersister extends AbstractComponent {
public void persistModelSizeStats(ModelSizeStats modelSizeStats) { public void persistModelSizeStats(ModelSizeStats modelSizeStats) {
String jobId = modelSizeStats.getJobId(); String jobId = modelSizeStats.getJobId();
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes()); logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE::getPreferredName, Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(),
ModelSizeStats.RESULT_TYPE_FIELD::getPreferredName, () -> toXContentBuilder(modelSizeStats)); ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
persistable.persist(); persistable.persist();
persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE::getPreferredName, persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, Result.TYPE.getPreferredName(), null);
() -> null, () -> toXContentBuilder(modelSizeStats));
persistable.persist(); persistable.persist();
// Don't commit as we expect masses of these updates and they're only // Don't commit as we expect masses of these updates and they're only
// for information at the API level // for information at the API level
@ -282,8 +281,7 @@ public class JobResultsPersister extends AbstractComponent {
* Persist model debug output * Persist model debug output
*/ */
public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) { public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) {
Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, ModelDebugOutput.TYPE::getPreferredName, Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, Result.TYPE.getPreferredName(), null);
() -> null, () -> toXContentBuilder(modelDebugOutput));
persistable.persist(); persistable.persist();
// Don't commit as we expect masses of these updates and they're not // Don't commit as we expect masses of these updates and they're not
// read again by this process // read again by this process
@ -356,44 +354,38 @@ public class JobResultsPersister extends AbstractComponent {
private class Persistable { private class Persistable {
private final String jobId; private final String jobId;
private final Object object; private final ToXContent object;
private final Supplier<String> typeSupplier; private final String type;
private final Supplier<String> idSupplier; private final String id;
private final Serialiser serialiser;
Persistable(String jobId, Object object, Supplier<String> typeSupplier, Supplier<String> idSupplier, Persistable(String jobId, ToXContent object, String type, String id) {
Serialiser serialiser) {
this.jobId = jobId; this.jobId = jobId;
this.object = object; this.object = object;
this.typeSupplier = typeSupplier; this.type = type;
this.idSupplier = idSupplier; this.id = id;
this.serialiser = serialiser;
} }
boolean persist() { boolean persist() {
String type = typeSupplier.get();
String id = idSupplier.get();
if (object == null) { if (object == null) {
logger.warn("[{}] No {} to persist for job ", jobId, type); logger.warn("[{}] No {} to persist for job ", jobId, type);
return false; return false;
} }
logCall(type, id); logCall();
try { try {
String indexName = getJobIndexName(jobId); String indexName = getJobIndexName(jobId);
client.prepareIndex(indexName, type, idSupplier.get()) client.prepareIndex(indexName, type, id)
.setSource(serialiser.serialise()) .setSource(toXContentBuilder(object))
.execute().actionGet(); .execute().actionGet();
return true; return true;
} catch (IOException e) { } catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, typeSupplier.get()}, e)); logger.error(new ParameterizedMessage("[{}] Error writing {}", new Object[]{jobId, type}, e));
return false; return false;
} }
} }
private void logCall(String type, String id) { private void logCall() {
String indexName = getJobIndexName(jobId); String indexName = getJobIndexName(jobId);
if (id != null) { if (id != null) {
logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, type, indexName, id); logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, type, indexName, id);
@ -402,8 +394,4 @@ public class JobResultsPersister extends AbstractComponent {
} }
} }
} }
private interface Serialiser {
XContentBuilder serialise() throws IOException;
}
} }

View File

@ -519,7 +519,7 @@ public class AnomalyRecord extends ToXContentToBytes implements Writeable {
normalizedProbability, typical, actual, normalizedProbability, typical, actual,
function, functionDescription, fieldName, byFieldName, byFieldValue, correlatedByFieldValue, function, functionDescription, fieldName, byFieldName, byFieldValue, correlatedByFieldValue,
partitionFieldName, partitionFieldValue, overFieldName, overFieldValue, partitionFieldName, partitionFieldValue, overFieldName, overFieldValue,
timestamp, isInterim, causes, influencers, jobId, RESULT_TYPE_VALUE); timestamp, isInterim, causes, influencers, jobId);
} }

View File

@ -41,7 +41,7 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSnapshot.PARSER, ModelSnapshot.TYPE); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSnapshot.PARSER, ModelSnapshot.TYPE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER, PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER,
ModelSizeStats.RESULT_TYPE_FIELD); ModelSizeStats.RESULT_TYPE_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelDebugOutput.PARSER, ModelDebugOutput.TYPE); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelDebugOutput.PARSER, ModelDebugOutput.RESULTS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), CategoryDefinition.PARSER, CategoryDefinition.TYPE); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), CategoryDefinition.PARSER, CategoryDefinition.TYPE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), FlushAcknowledgement.PARSER, FlushAcknowledgement.TYPE); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), FlushAcknowledgement.PARSER, FlushAcknowledgement.TYPE);
} }
@ -156,7 +156,7 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
addNullableField(Quantiles.TYPE, quantiles, builder); addNullableField(Quantiles.TYPE, quantiles, builder);
addNullableField(ModelSnapshot.TYPE, modelSnapshot, builder); addNullableField(ModelSnapshot.TYPE, modelSnapshot, builder);
addNullableField(ModelSizeStats.RESULT_TYPE_FIELD, modelSizeStats, builder); addNullableField(ModelSizeStats.RESULT_TYPE_FIELD, modelSizeStats, builder);
addNullableField(ModelDebugOutput.TYPE, modelDebugOutput, builder); addNullableField(ModelDebugOutput.RESULTS_FIELD, modelDebugOutput, builder);
addNullableField(CategoryDefinition.TYPE, categoryDefinition, builder); addNullableField(CategoryDefinition.TYPE, categoryDefinition, builder);
addNullableField(FlushAcknowledgement.TYPE, flushAcknowledgement, builder); addNullableField(FlushAcknowledgement.TYPE, flushAcknowledgement, builder);
builder.endObject(); builder.endObject();

View File

@ -28,7 +28,12 @@ import java.util.Objects;
* the restrictions on Elasticsearch mappings). * the restrictions on Elasticsearch mappings).
*/ */
public class ModelDebugOutput extends ToXContentToBytes implements Writeable { public class ModelDebugOutput extends ToXContentToBytes implements Writeable {
public static final ParseField TYPE = new ParseField("model_debug_output"); /**
* Result type
*/
public static final String RESULT_TYPE_VALUE = "model_debug_output";
public static final ParseField RESULTS_FIELD = new ParseField(RESULT_TYPE_VALUE);
public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField PARTITION_FIELD_NAME = new ParseField("partition_field_name"); public static final ParseField PARTITION_FIELD_NAME = new ParseField("partition_field_name");
public static final ParseField PARTITION_FIELD_VALUE = new ParseField("partition_field_value"); public static final ParseField PARTITION_FIELD_VALUE = new ParseField("partition_field_value");
@ -43,10 +48,11 @@ public class ModelDebugOutput extends ToXContentToBytes implements Writeable {
public static final ParseField ACTUAL = new ParseField("actual"); public static final ParseField ACTUAL = new ParseField("actual");
public static final ConstructingObjectParser<ModelDebugOutput, ParseFieldMatcherSupplier> PARSER = public static final ConstructingObjectParser<ModelDebugOutput, ParseFieldMatcherSupplier> PARSER =
new ConstructingObjectParser<>(TYPE.getPreferredName(), a -> new ModelDebugOutput((String) a[0])); new ConstructingObjectParser<>(RESULT_TYPE_VALUE, a -> new ModelDebugOutput((String) a[0]));
static { static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareString((modelDebugOutput, s) -> {}, Result.RESULT_TYPE);
PARSER.declareField(ModelDebugOutput::setTimestamp, p -> { PARSER.declareField(ModelDebugOutput::setTimestamp, p -> {
if (p.currentToken() == Token.VALUE_NUMBER) { if (p.currentToken() == Token.VALUE_NUMBER) {
return new Date(p.longValue()); return new Date(p.longValue());
@ -132,6 +138,7 @@ public class ModelDebugOutput extends ToXContentToBytes implements Writeable {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(); builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId); builder.field(Job.ID.getPreferredName(), jobId);
builder.field(Result.RESULT_TYPE.getPreferredName(), RESULT_TYPE_VALUE);
if (timestamp != null) { if (timestamp != null) {
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime()); builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
} }

View File

@ -91,7 +91,6 @@ public class ElasticsearchMappingsTests extends ESTestCase {
overridden.add(CategoryDefinition.TYPE.getPreferredName()); overridden.add(CategoryDefinition.TYPE.getPreferredName());
overridden.add(Job.TYPE); overridden.add(Job.TYPE);
overridden.add(ListDocument.TYPE.getPreferredName()); overridden.add(ListDocument.TYPE.getPreferredName());
overridden.add(ModelDebugOutput.TYPE.getPreferredName());
overridden.add(ModelState.TYPE.getPreferredName()); overridden.add(ModelState.TYPE.getPreferredName());
overridden.add(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName()); overridden.add(ModelSizeStats.RESULT_TYPE_FIELD.getPreferredName());
overridden.add(ModelSnapshot.TYPE.getPreferredName()); overridden.add(ModelSnapshot.TYPE.getPreferredName());
@ -142,11 +141,6 @@ public class ElasticsearchMappingsTests extends ESTestCase {
parser = new JsonFactory().createParser(inputStream); parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected); parseJson(parser, expected);
builder = ElasticsearchMappings.modelDebugOutputMapping(null);
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream);
parseJson(parser, expected);
builder = ElasticsearchMappings.modelSnapshotMapping(); builder = ElasticsearchMappings.modelSnapshotMapping();
inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8))); inputStream = new BufferedInputStream(new ByteArrayInputStream(builder.string().getBytes(StandardCharsets.UTF_8)));
parser = new JsonFactory().createParser(inputStream); parser = new JsonFactory().createParser(inputStream);