Split records and influencers from bucket (elastic/elasticsearch#389)
In c++ the results are built all together under a bucket hierarchy. This buckets was written out and java would read it and split the bucket into its parts: the bucket itself, its records and its influencers. During the migration, the bucket started being persisted as a whole, including its records and influencers. This commit is changing this by modifying the way results are written in c++. This way, the java and c++ results writing/reading are in sync. To achieve this, the change involved writing records and influencers as top level results from c++. In addition, they are written as an array object in order to allow the java side to persist them in a bulk request. * Fix bucket counting in results processor Original commit: elastic/x-pack-elasticsearch@feadf3f887
This commit is contained in:
parent
9286ef2304
commit
37cd03ad4d
|
@ -48,7 +48,7 @@ public class JobRenormaliser extends AbstractComponent {
|
|||
String indexName = JobResultsPersister.getJobIndexName(jobId);
|
||||
logger.trace("[{}] ES API CALL: index type {} to index {} with ID {}", jobId, Bucket.TYPE, indexName, bucket.getId());
|
||||
client.prepareIndex(indexName, Bucket.TYPE.getPreferredName(), bucket.getId())
|
||||
.setSource(jobResultsPersister.serialiseWithJobId(Bucket.TYPE.getPreferredName(), bucket)).execute().actionGet();
|
||||
.setSource(jobResultsPersister.toXContentBuilder(bucket)).execute().actionGet();
|
||||
} catch (IOException e) {
|
||||
logger.error(new ParameterizedMessage("[{}] Error updating bucket state", new Object[]{jobId}, e));
|
||||
return;
|
||||
|
@ -88,7 +88,7 @@ public class JobRenormaliser extends AbstractComponent {
|
|||
|
||||
bulkRequest.add(
|
||||
client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName(), recordId)
|
||||
.setSource(jobResultsPersister.serialiseWithJobId(AnomalyRecord.TYPE.getPreferredName(), record)));
|
||||
.setSource(jobResultsPersister.toXContentBuilder(record)));
|
||||
|
||||
addedAny = true;
|
||||
}
|
||||
|
|
|
@ -69,13 +69,9 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
* Persist the result bucket
|
||||
*/
|
||||
public void persistBucket(Bucket bucket) {
|
||||
if (bucket.getRecords() == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String jobId = bucket.getJobId();
|
||||
try {
|
||||
XContentBuilder content = serialiseWithJobId(Bucket.TYPE.getPreferredName(), bucket);
|
||||
XContentBuilder content = toXContentBuilder(bucket);
|
||||
String indexName = getJobIndexName(jobId);
|
||||
logger.trace("[{}] ES API CALL: index type {} to index {} at epoch {}", jobId, Bucket.TYPE, indexName, bucket.getEpoch());
|
||||
IndexResponse response = client.prepareIndex(indexName, Bucket.TYPE.getPreferredName())
|
||||
|
@ -84,43 +80,71 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
bucket.setId(response.getId());
|
||||
persistBucketInfluencersStandalone(jobId, bucket.getId(), bucket.getBucketInfluencers(), bucket.getTimestamp(),
|
||||
bucket.isInterim());
|
||||
if (bucket.getInfluencers() != null && bucket.getInfluencers().isEmpty() == false) {
|
||||
BulkRequestBuilder addInfluencersRequest = client.prepareBulk();
|
||||
for (Influencer influencer : bucket.getInfluencers()) {
|
||||
influencer.setTimestamp(bucket.getTimestamp());
|
||||
influencer.setInterim(bucket.isInterim());
|
||||
content = serialiseWithJobId(Influencer.TYPE.getPreferredName(), influencer);
|
||||
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID",
|
||||
jobId, Influencer.TYPE, indexName);
|
||||
addInfluencersRequest.add(client.prepareIndex(indexName, Influencer.TYPE.getPreferredName())
|
||||
.setSource(content));
|
||||
}
|
||||
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addInfluencersRequest.numberOfActions());
|
||||
BulkResponse addInfluencersResponse = addInfluencersRequest.execute().actionGet();
|
||||
if (addInfluencersResponse.hasFailures()) {
|
||||
logger.error("[{}] Bulk index of Influencers has errors: {}", jobId, addInfluencersResponse.buildFailureMessage());
|
||||
}
|
||||
}
|
||||
if (bucket.getRecords().isEmpty() == false) {
|
||||
BulkRequestBuilder addRecordsRequest = client.prepareBulk();
|
||||
for (AnomalyRecord record : bucket.getRecords()) {
|
||||
record.setTimestamp(bucket.getTimestamp());
|
||||
content = serialiseWithJobId(AnomalyRecord.TYPE.getPreferredName(), record);
|
||||
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID, for bucket {}",
|
||||
jobId, AnomalyRecord.TYPE, indexName, bucket.getId());
|
||||
addRecordsRequest.add(client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName())
|
||||
.setSource(content));
|
||||
}
|
||||
|
||||
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addRecordsRequest.numberOfActions());
|
||||
BulkResponse addRecordsResponse = addRecordsRequest.execute().actionGet();
|
||||
if (addRecordsResponse.hasFailures()) {
|
||||
logger.error("[{}] Bulk index of AnomalyRecord has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
|
||||
}
|
||||
}
|
||||
persistPerPartitionMaxProbabilities(bucket);
|
||||
} catch (IOException e) {
|
||||
logger.error(new ParameterizedMessage("[{}] Error writing bucket state", new Object[] {jobId}, e));
|
||||
logger.error(new ParameterizedMessage("[{}] Error persisting bucket", new Object[] {jobId}, e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist a list of anomaly records
|
||||
* @param records the records to persist
|
||||
*/
|
||||
public void persistRecords(List<AnomalyRecord> records) {
|
||||
if (records.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
String jobId = records.get(0).getJobId();
|
||||
String indexName = getJobIndexName(jobId);
|
||||
BulkRequestBuilder addRecordsRequest = client.prepareBulk();
|
||||
XContentBuilder content = null;
|
||||
try {
|
||||
for (AnomalyRecord record : records) {
|
||||
content = toXContentBuilder(record);
|
||||
|
||||
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID", jobId, AnomalyRecord.TYPE, indexName);
|
||||
addRecordsRequest.add(client.prepareIndex(indexName, AnomalyRecord.TYPE.getPreferredName()).setSource(content));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error(new ParameterizedMessage("[{}] Error persisting records", new Object[] {jobId}, e));
|
||||
return;
|
||||
}
|
||||
|
||||
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addRecordsRequest.numberOfActions());
|
||||
BulkResponse addRecordsResponse = addRecordsRequest.execute().actionGet();
|
||||
if (addRecordsResponse.hasFailures()) {
|
||||
logger.error("[{}] Bulk index of AnomalyRecord has errors: {}", jobId, addRecordsResponse.buildFailureMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist a list of influencers
|
||||
* @param influencers the influencers to persist
|
||||
*/
|
||||
public void persistInfluencers(List<Influencer> influencers) {
|
||||
if (influencers.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
String jobId = influencers.get(0).getJobId();
|
||||
String indexName = getJobIndexName(jobId);
|
||||
BulkRequestBuilder addInfluencersRequest = client.prepareBulk();
|
||||
XContentBuilder content = null;
|
||||
try {
|
||||
for (Influencer influencer : influencers) {
|
||||
content = toXContentBuilder(influencer);
|
||||
logger.trace("[{}] ES BULK ACTION: index type {} to index {} with auto-generated ID",
|
||||
jobId, Influencer.TYPE, indexName);
|
||||
addInfluencersRequest.add(client.prepareIndex(indexName, Influencer.TYPE.getPreferredName()).setSource(content));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.error(new ParameterizedMessage("[{}] Error persisting influencers", new Object[] {jobId}, e));
|
||||
return;
|
||||
}
|
||||
|
||||
logger.trace("[{}] ES API CALL: bulk request with {} actions", jobId, addInfluencersRequest.numberOfActions());
|
||||
BulkResponse addInfluencersResponse = addInfluencersRequest.execute().actionGet();
|
||||
if (addInfluencersResponse.hasFailures()) {
|
||||
logger.error("[{}] Bulk index of Influencers has errors: {}", jobId, addInfluencersResponse.buildFailureMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,7 +165,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
*/
|
||||
public void persistQuantiles(Quantiles quantiles) {
|
||||
Persistable persistable = new Persistable(quantiles.getJobId(), quantiles, Quantiles.TYPE::getPreferredName,
|
||||
() -> Quantiles.QUANTILES_ID, () -> serialiseWithJobId(Quantiles.TYPE.getPreferredName(), quantiles));
|
||||
() -> Quantiles.QUANTILES_ID, () -> toXContentBuilder(quantiles));
|
||||
if (persistable.persist()) {
|
||||
// Refresh the index when persisting quantiles so that previously
|
||||
// persisted results will be available for searching. Do this using the
|
||||
|
@ -157,7 +181,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
*/
|
||||
public void persistModelSnapshot(ModelSnapshot modelSnapshot) {
|
||||
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.TYPE::getPreferredName,
|
||||
modelSnapshot::getSnapshotId, () -> serialiseWithJobId(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot));
|
||||
modelSnapshot::getSnapshotId, () -> toXContentBuilder(modelSnapshot));
|
||||
persistable.persist();
|
||||
}
|
||||
|
||||
|
@ -168,10 +192,10 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
String jobId = modelSizeStats.getJobId();
|
||||
logger.trace("[{}] Persisting model size stats, for size {}", jobId, modelSizeStats.getModelBytes());
|
||||
Persistable persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, ModelSizeStats.TYPE::getPreferredName,
|
||||
() -> jobId, () -> serialiseWithJobId(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats));
|
||||
() -> jobId, () -> toXContentBuilder(modelSizeStats));
|
||||
persistable.persist();
|
||||
persistable = new Persistable(modelSizeStats.getJobId(), modelSizeStats, ModelSizeStats.TYPE::getPreferredName,
|
||||
() -> null, () -> serialiseWithJobId(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats));
|
||||
() -> null, () -> toXContentBuilder(modelSizeStats));
|
||||
persistable.persist();
|
||||
// Don't commit as we expect masses of these updates and they're only
|
||||
// for information at the API level
|
||||
|
@ -182,7 +206,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
*/
|
||||
public void persistModelDebugOutput(ModelDebugOutput modelDebugOutput) {
|
||||
Persistable persistable = new Persistable(modelDebugOutput.getJobId(), modelDebugOutput, ModelDebugOutput.TYPE::getPreferredName,
|
||||
() -> null, () -> serialiseWithJobId(ModelDebugOutput.TYPE.getPreferredName(), modelDebugOutput));
|
||||
() -> null, () -> toXContentBuilder(modelDebugOutput));
|
||||
persistable.persist();
|
||||
// Don't commit as we expect masses of these updates and they're not
|
||||
// read again by this process
|
||||
|
@ -193,7 +217,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
*/
|
||||
public void persistInfluencer(Influencer influencer) {
|
||||
Persistable persistable = new Persistable(influencer.getJobId(), influencer, Influencer.TYPE::getPreferredName,
|
||||
influencer::getId, () -> serialiseWithJobId(Influencer.TYPE.getPreferredName(), influencer));
|
||||
influencer::getId, () -> toXContentBuilder(influencer));
|
||||
persistable.persist();
|
||||
// Don't commit as we expect masses of these updates and they're not
|
||||
// read again by this process
|
||||
|
@ -251,7 +275,7 @@ public class JobResultsPersister extends AbstractComponent {
|
|||
}
|
||||
|
||||
|
||||
XContentBuilder serialiseWithJobId(String objField, ToXContent obj) throws IOException {
|
||||
XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
|
||||
XContentBuilder builder = jsonBuilder();
|
||||
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
return builder;
|
||||
|
|
|
@ -78,7 +78,7 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
|
|||
@Override
|
||||
public String flushJob(InterimResultsParams params) throws IOException {
|
||||
FlushAcknowledgement flushAcknowledgement = new FlushAcknowledgement(FLUSH_ID);
|
||||
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, flushAcknowledgement);
|
||||
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, flushAcknowledgement);
|
||||
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
|
||||
builder.value(result);
|
||||
pipedProcessOutStream.write(builder.string().getBytes(StandardCharsets.UTF_8));
|
||||
|
|
|
@ -14,14 +14,17 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
|
|||
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
|
||||
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
|
||||
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
|
||||
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
|
||||
import org.elasticsearch.xpack.prelert.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
|
||||
import org.elasticsearch.xpack.prelert.job.results.Influencer;
|
||||
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
|
||||
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -69,8 +72,10 @@ public class AutoDetectResultProcessor {
|
|||
while (iterator.hasNext()) {
|
||||
AutodetectResult result = iterator.next();
|
||||
processResult(context, result);
|
||||
bucketCount++;
|
||||
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
|
||||
if (result.getBucket() != null) {
|
||||
bucketCount++;
|
||||
LOGGER.trace("[{}] Bucket number {} parsed from output", jobId, bucketCount);
|
||||
}
|
||||
}
|
||||
LOGGER.info("[{}] {} buckets parsed from autodetect output - about to refresh indexes", jobId, bucketCount);
|
||||
LOGGER.info("[{}] Parse results Complete", jobId);
|
||||
|
@ -102,6 +107,14 @@ public class AutoDetectResultProcessor {
|
|||
}
|
||||
persister.persistBucket(bucket);
|
||||
}
|
||||
List<AnomalyRecord> records = result.getRecords();
|
||||
if (records != null && !records.isEmpty()) {
|
||||
persister.persistRecords(records);
|
||||
}
|
||||
List<Influencer> influencers = result.getInfluencers();
|
||||
if (influencers != null && !influencers.isEmpty()) {
|
||||
persister.persistInfluencers(influencers);
|
||||
}
|
||||
CategoryDefinition categoryDefinition = result.getCategoryDefinition();
|
||||
if (categoryDefinition != null) {
|
||||
persister.persistCategoryDefinition(categoryDefinition);
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSizeStats;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
|
||||
|
@ -19,19 +20,25 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.FlushAcknow
|
|||
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class AutodetectResult extends ToXContentToBytes implements Writeable {
|
||||
|
||||
public static final ParseField TYPE = new ParseField("autodetect_result");
|
||||
public static final ParseField RECORDS = new ParseField("records");
|
||||
public static final ParseField INFLUENCERS = new ParseField("influencers");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static final ConstructingObjectParser<AutodetectResult, ParseFieldMatcherSupplier> PARSER = new ConstructingObjectParser<>(
|
||||
TYPE.getPreferredName(), a -> new AutodetectResult((Bucket) a[0], (Quantiles) a[1], (ModelSnapshot) a[2],
|
||||
a[3] == null ? null : ((ModelSizeStats.Builder) a[3]).build(), (ModelDebugOutput) a[4], (CategoryDefinition) a[5],
|
||||
(FlushAcknowledgement) a[6]));
|
||||
TYPE.getPreferredName(), a -> new AutodetectResult((Bucket) a[0], (List<AnomalyRecord>) a[1], (List<Influencer>) a[2],
|
||||
(Quantiles) a[3], (ModelSnapshot) a[4], a[5] == null ? null : ((ModelSizeStats.Builder) a[5]).build(),
|
||||
(ModelDebugOutput) a[6], (CategoryDefinition) a[7], (FlushAcknowledgement) a[8]));
|
||||
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Bucket.PARSER, Bucket.TYPE);
|
||||
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), AnomalyRecord.PARSER, RECORDS);
|
||||
PARSER.declareObjectArray(ConstructingObjectParser.optionalConstructorArg(), Influencer.PARSER, INFLUENCERS);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), Quantiles.PARSER, Quantiles.TYPE);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSnapshot.PARSER, ModelSnapshot.TYPE);
|
||||
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), ModelSizeStats.PARSER, ModelSizeStats.TYPE);
|
||||
|
@ -41,6 +48,8 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
|
|||
}
|
||||
|
||||
private final Bucket bucket;
|
||||
private final List<AnomalyRecord> records;
|
||||
private final List<Influencer> influencers;
|
||||
private final Quantiles quantiles;
|
||||
private final ModelSnapshot modelSnapshot;
|
||||
private final ModelSizeStats modelSizeStats;
|
||||
|
@ -48,9 +57,12 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
|
|||
private final CategoryDefinition categoryDefinition;
|
||||
private final FlushAcknowledgement flushAcknowledgement;
|
||||
|
||||
public AutodetectResult(Bucket bucket, Quantiles quantiles, ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats,
|
||||
ModelDebugOutput modelDebugOutput, CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) {
|
||||
public AutodetectResult(Bucket bucket, List<AnomalyRecord> records, List<Influencer> influencers, Quantiles quantiles,
|
||||
ModelSnapshot modelSnapshot, ModelSizeStats modelSizeStats, ModelDebugOutput modelDebugOutput,
|
||||
CategoryDefinition categoryDefinition, FlushAcknowledgement flushAcknowledgement) {
|
||||
this.bucket = bucket;
|
||||
this.records = records;
|
||||
this.influencers = influencers;
|
||||
this.quantiles = quantiles;
|
||||
this.modelSnapshot = modelSnapshot;
|
||||
this.modelSizeStats = modelSizeStats;
|
||||
|
@ -65,6 +77,16 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
|
|||
} else {
|
||||
this.bucket = null;
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
this.records = in.readList(AnomalyRecord::new);
|
||||
} else {
|
||||
this.records = null;
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
this.influencers = in.readList(Influencer::new);
|
||||
} else {
|
||||
this.influencers = null;
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
this.quantiles = new Quantiles(in);
|
||||
} else {
|
||||
|
@ -99,75 +121,73 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
|
|||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
boolean hasBucket = bucket != null;
|
||||
out.writeBoolean(hasBucket);
|
||||
if (hasBucket) {
|
||||
bucket.writeTo(out);
|
||||
writeNullable(bucket, out);
|
||||
writeNullable(records, out);
|
||||
writeNullable(influencers, out);
|
||||
writeNullable(quantiles, out);
|
||||
writeNullable(modelSnapshot, out);
|
||||
writeNullable(modelSizeStats, out);
|
||||
writeNullable(modelDebugOutput, out);
|
||||
writeNullable(categoryDefinition, out);
|
||||
writeNullable(flushAcknowledgement, out);
|
||||
}
|
||||
|
||||
private static void writeNullable(Writeable writeable, StreamOutput out) throws IOException {
|
||||
boolean isPresent = writeable != null;
|
||||
out.writeBoolean(isPresent);
|
||||
if (isPresent) {
|
||||
writeable.writeTo(out);
|
||||
}
|
||||
boolean hasQuantiles = quantiles != null;
|
||||
out.writeBoolean(hasQuantiles);
|
||||
if (hasQuantiles) {
|
||||
quantiles.writeTo(out);
|
||||
}
|
||||
boolean hasModelSnapshot = modelSnapshot != null;
|
||||
out.writeBoolean(hasModelSnapshot);
|
||||
if (hasModelSnapshot) {
|
||||
modelSnapshot.writeTo(out);
|
||||
}
|
||||
boolean hasModelSizeStats = modelSizeStats != null;
|
||||
out.writeBoolean(hasModelSizeStats);
|
||||
if (hasModelSizeStats) {
|
||||
modelSizeStats.writeTo(out);
|
||||
}
|
||||
boolean hasModelDebugOutput = modelDebugOutput != null;
|
||||
out.writeBoolean(hasModelDebugOutput);
|
||||
if (hasModelDebugOutput) {
|
||||
modelDebugOutput.writeTo(out);
|
||||
}
|
||||
boolean hasCategoryDefinition = categoryDefinition != null;
|
||||
out.writeBoolean(hasCategoryDefinition);
|
||||
if (hasCategoryDefinition) {
|
||||
categoryDefinition.writeTo(out);
|
||||
}
|
||||
boolean hasFlushAcknowledgement = flushAcknowledgement != null;
|
||||
out.writeBoolean(hasFlushAcknowledgement);
|
||||
if (hasFlushAcknowledgement) {
|
||||
flushAcknowledgement.writeTo(out);
|
||||
}
|
||||
|
||||
private static void writeNullable(List<? extends Writeable> writeables, StreamOutput out) throws IOException {
|
||||
boolean isPresent = writeables != null;
|
||||
out.writeBoolean(isPresent);
|
||||
if (isPresent) {
|
||||
out.writeList(writeables);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (bucket != null) {
|
||||
builder.field(Bucket.TYPE.getPreferredName(), bucket);
|
||||
}
|
||||
if (quantiles != null) {
|
||||
builder.field(Quantiles.TYPE.getPreferredName(), quantiles);
|
||||
}
|
||||
if (modelSnapshot != null) {
|
||||
builder.field(ModelSnapshot.TYPE.getPreferredName(), modelSnapshot);
|
||||
}
|
||||
if (modelSizeStats != null) {
|
||||
builder.field(ModelSizeStats.TYPE.getPreferredName(), modelSizeStats);
|
||||
}
|
||||
if (modelDebugOutput != null) {
|
||||
builder.field(ModelDebugOutput.TYPE.getPreferredName(), modelDebugOutput);
|
||||
}
|
||||
if (categoryDefinition != null) {
|
||||
builder.field(CategoryDefinition.TYPE.getPreferredName(), categoryDefinition);
|
||||
}
|
||||
if (flushAcknowledgement != null) {
|
||||
builder.field(FlushAcknowledgement.TYPE.getPreferredName(), flushAcknowledgement);
|
||||
}
|
||||
addNullableField(Bucket.TYPE, bucket, builder);
|
||||
addNullableField(RECORDS, records, builder);
|
||||
addNullableField(INFLUENCERS, influencers, builder);
|
||||
addNullableField(Quantiles.TYPE, quantiles, builder);
|
||||
addNullableField(ModelSnapshot.TYPE, modelSnapshot, builder);
|
||||
addNullableField(ModelSizeStats.TYPE, modelSizeStats, builder);
|
||||
addNullableField(ModelDebugOutput.TYPE, modelDebugOutput, builder);
|
||||
addNullableField(CategoryDefinition.TYPE, categoryDefinition, builder);
|
||||
addNullableField(FlushAcknowledgement.TYPE, flushAcknowledgement, builder);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
private static void addNullableField(ParseField field, ToXContent value, XContentBuilder builder) throws IOException {
|
||||
if (value != null) {
|
||||
builder.field(field.getPreferredName(), value);
|
||||
}
|
||||
}
|
||||
|
||||
private static void addNullableField(ParseField field, List<? extends ToXContent> values, XContentBuilder builder) throws IOException {
|
||||
if (values != null) {
|
||||
builder.field(field.getPreferredName(), values);
|
||||
}
|
||||
}
|
||||
|
||||
public Bucket getBucket() {
|
||||
return bucket;
|
||||
}
|
||||
|
||||
public List<AnomalyRecord> getRecords() {
|
||||
return records;
|
||||
}
|
||||
|
||||
public List<Influencer> getInfluencers() {
|
||||
return influencers;
|
||||
}
|
||||
|
||||
public Quantiles getQuantiles() {
|
||||
return quantiles;
|
||||
}
|
||||
|
@ -194,7 +214,8 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(bucket, categoryDefinition, flushAcknowledgement, modelDebugOutput, modelSizeStats, modelSnapshot, quantiles);
|
||||
return Objects.hash(bucket, records, influencers, categoryDefinition, flushAcknowledgement, modelDebugOutput, modelSizeStats,
|
||||
modelSnapshot, quantiles);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -207,6 +228,8 @@ public class AutodetectResult extends ToXContentToBytes implements Writeable {
|
|||
}
|
||||
AutodetectResult other = (AutodetectResult) obj;
|
||||
return Objects.equals(bucket, other.bucket) &&
|
||||
Objects.equals(records, other.records) &&
|
||||
Objects.equals(influencers, other.influencers) &&
|
||||
Objects.equals(categoryDefinition, other.categoryDefinition) &&
|
||||
Objects.equals(flushAcknowledgement, other.flushAcknowledgement) &&
|
||||
Objects.equals(modelDebugOutput, other.modelDebugOutput) &&
|
||||
|
|
|
@ -6,14 +6,15 @@
|
|||
package org.elasticsearch.xpack.prelert.job.results;
|
||||
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParseFieldMatcherSupplier;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser.Token;
|
||||
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
|
||||
|
||||
|
@ -45,7 +46,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
public static final ParseField EVENT_COUNT = new ParseField("eventCount");
|
||||
public static final ParseField RECORDS = new ParseField("records");
|
||||
public static final ParseField BUCKET_INFLUENCERS = new ParseField("bucketInfluencers");
|
||||
public static final ParseField INFLUENCERS = new ParseField("influencers");
|
||||
public static final ParseField BUCKET_SPAN = new ParseField("bucketSpan");
|
||||
public static final ParseField PROCESSING_TIME_MS = new ParseField("processingTimeMs");
|
||||
public static final ParseField PARTITION_SCORES = new ParseField("partitionScores");
|
||||
|
@ -79,7 +79,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
PARSER.declareLong(Bucket::setEventCount, EVENT_COUNT);
|
||||
PARSER.declareObjectArray(Bucket::setRecords, AnomalyRecord.PARSER, RECORDS);
|
||||
PARSER.declareObjectArray(Bucket::setBucketInfluencers, BucketInfluencer.PARSER, BUCKET_INFLUENCERS);
|
||||
PARSER.declareObjectArray(Bucket::setInfluencers, Influencer.PARSER, INFLUENCERS);
|
||||
PARSER.declareLong(Bucket::setBucketSpan, BUCKET_SPAN);
|
||||
PARSER.declareLong(Bucket::setProcessingTimeMs, PROCESSING_TIME_MS);
|
||||
PARSER.declareObjectArray(Bucket::setPartitionScores, PartitionScore.PARSER, PARTITION_SCORES);
|
||||
|
@ -100,7 +99,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
private boolean isInterim;
|
||||
private boolean hadBigNormalisedUpdate;
|
||||
private List<BucketInfluencer> bucketInfluencers = new ArrayList<>();
|
||||
private List<Influencer> influencers = Collections.emptyList();
|
||||
private long processingTimeMs;
|
||||
private Map<String, Double> perPartitionMaxProbability = Collections.emptyMap();
|
||||
private List<PartitionScore> partitionScores = Collections.emptyList();
|
||||
|
@ -126,7 +124,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
isInterim = in.readBoolean();
|
||||
hadBigNormalisedUpdate = in.readBoolean();
|
||||
bucketInfluencers = in.readList(BucketInfluencer::new);
|
||||
influencers = in.readList(Influencer::new);
|
||||
processingTimeMs = in.readLong();
|
||||
perPartitionMaxProbability = (Map<String, Double>) in.readGenericValue();
|
||||
partitionScores = in.readList(PartitionScore::new);
|
||||
|
@ -151,7 +148,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
out.writeBoolean(isInterim);
|
||||
out.writeBoolean(hadBigNormalisedUpdate);
|
||||
out.writeList(bucketInfluencers);
|
||||
out.writeList(influencers);
|
||||
out.writeLong(processingTimeMs);
|
||||
out.writeGenericValue(perPartitionMaxProbability);
|
||||
out.writeList(partitionScores);
|
||||
|
@ -169,11 +165,12 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
builder.field(INITIAL_ANOMALY_SCORE.getPreferredName(), initialAnomalyScore);
|
||||
builder.field(MAX_NORMALIZED_PROBABILITY.getPreferredName(), maxNormalizedProbability);
|
||||
builder.field(RECORD_COUNT.getPreferredName(), recordCount);
|
||||
builder.field(RECORDS.getPreferredName(), records);
|
||||
if (records != null && !records.isEmpty()) {
|
||||
builder.field(RECORDS.getPreferredName(), records);
|
||||
}
|
||||
builder.field(EVENT_COUNT.getPreferredName(), eventCount);
|
||||
builder.field(IS_INTERIM.getPreferredName(), isInterim);
|
||||
builder.field(BUCKET_INFLUENCERS.getPreferredName(), bucketInfluencers);
|
||||
builder.field(INFLUENCERS.getPreferredName(), influencers);
|
||||
builder.field(PROCESSING_TIME_MS.getPreferredName(), processingTimeMs);
|
||||
builder.field(PARTITION_SCORES.getPreferredName(), partitionScores);
|
||||
builder.endObject();
|
||||
|
@ -256,10 +253,15 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get all the anomaly records associated with this bucket
|
||||
* Get all the anomaly records associated with this bucket.
|
||||
* The records are not part of the bucket document. They will
|
||||
* only be present when the bucket was retrieved and expanded
|
||||
* to contain the associated records.
|
||||
*
|
||||
* @return All the anomaly records
|
||||
* @return <code>null</code> or the anomaly records for the bucket
|
||||
* if the bucket was expanded.
|
||||
*/
|
||||
@Nullable
|
||||
public List<AnomalyRecord> getRecords() {
|
||||
return records;
|
||||
}
|
||||
|
@ -295,14 +297,6 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
processingTimeMs = timeMs;
|
||||
}
|
||||
|
||||
public List<Influencer> getInfluencers() {
|
||||
return influencers;
|
||||
}
|
||||
|
||||
public void setInfluencers(List<Influencer> influences) {
|
||||
this.influencers = influences;
|
||||
}
|
||||
|
||||
public List<BucketInfluencer> getBucketInfluencers() {
|
||||
return bucketInfluencers;
|
||||
}
|
||||
|
@ -377,7 +371,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
// hadBigNormalisedUpdate is deliberately excluded from the hash
|
||||
// as is id, which is generated by the datastore
|
||||
return Objects.hash(jobId, timestamp, eventCount, initialAnomalyScore, anomalyScore, maxNormalizedProbability, recordCount, records,
|
||||
isInterim, bucketSpan, bucketInfluencers, influencers);
|
||||
isInterim, bucketSpan, bucketInfluencers);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -402,7 +396,7 @@ public class Bucket extends ToXContentToBytes implements Writeable {
|
|||
&& (this.anomalyScore == that.anomalyScore) && (this.initialAnomalyScore == that.initialAnomalyScore)
|
||||
&& (this.maxNormalizedProbability == that.maxNormalizedProbability) && (this.recordCount == that.recordCount)
|
||||
&& Objects.equals(this.records, that.records) && Objects.equals(this.isInterim, that.isInterim)
|
||||
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers) && Objects.equals(this.influencers, that.influencers);
|
||||
&& Objects.equals(this.bucketInfluencers, that.bucketInfluencers);
|
||||
}
|
||||
|
||||
public boolean hadBigNormalisedUpdate() {
|
||||
|
|
|
@ -93,7 +93,6 @@ public final class ReservedFieldNames {
|
|||
Bucket.EVENT_COUNT.getPreferredName(),
|
||||
Bucket.RECORDS.getPreferredName(),
|
||||
Bucket.BUCKET_INFLUENCERS.getPreferredName(),
|
||||
Bucket.INFLUENCERS.getPreferredName(),
|
||||
Bucket.INITIAL_ANOMALY_SCORE.getPreferredName(),
|
||||
Bucket.PROCESSING_TIME_MS.getPreferredName(),
|
||||
Bucket.PARTITION_SCORES.getPreferredName(),
|
||||
|
|
|
@ -10,7 +10,6 @@ import org.elasticsearch.xpack.prelert.job.persistence.QueryPage;
|
|||
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
|
||||
import org.elasticsearch.xpack.prelert.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
|
||||
import org.elasticsearch.xpack.prelert.job.results.Influencer;
|
||||
import org.elasticsearch.xpack.prelert.job.results.PartitionScore;
|
||||
import org.elasticsearch.xpack.prelert.support.AbstractStreamableTestCase;
|
||||
|
||||
|
@ -57,22 +56,6 @@ public class GetBucketActionResponseTests extends AbstractStreamableTestCase<Get
|
|||
if (randomBoolean()) {
|
||||
bucket.setId(randomAsciiOfLengthBetween(1, 20));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int size = randomInt(10);
|
||||
List<Influencer> influencers = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
Influencer influencer = new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20),
|
||||
randomAsciiOfLengthBetween(1, 20));
|
||||
influencer.setAnomalyScore(randomDouble());
|
||||
influencer.setInitialAnomalyScore(randomDouble());
|
||||
influencer.setProbability(randomDouble());
|
||||
influencer.setId(randomAsciiOfLengthBetween(1, 20));
|
||||
influencer.setInterim(randomBoolean());
|
||||
influencer.setTimestamp(new Date(randomLong()));
|
||||
influencers.add(influencer);
|
||||
}
|
||||
bucket.setInfluencers(influencers);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
bucket.setInitialAnomalyScore(randomDouble());
|
||||
}
|
||||
|
|
|
@ -5,27 +5,24 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.prelert.job.persistence;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
|
||||
import org.elasticsearch.xpack.prelert.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.prelert.job.results.BucketInfluencer;
|
||||
import org.elasticsearch.xpack.prelert.job.results.Influencer;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
|
||||
public class JobResultsPersisterTests extends ESTestCase {
|
||||
|
@ -33,28 +30,19 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
private static final String CLUSTER_NAME = "myCluster";
|
||||
private static final String JOB_ID = "foo";
|
||||
|
||||
public void testPersistBucket_NoRecords() {
|
||||
Client client = mock(Client.class);
|
||||
Bucket bucket = mock(Bucket.class);
|
||||
when(bucket.getRecords()).thenReturn(null);
|
||||
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
|
||||
persister.persistBucket(bucket);
|
||||
verifyNoMoreInteractions(client);
|
||||
}
|
||||
|
||||
public void testPersistBucket_OneRecord() throws IOException {
|
||||
ArgumentCaptor<XContentBuilder> captor = ArgumentCaptor.forClass(XContentBuilder.class);
|
||||
BulkResponse response = mock(BulkResponse.class);
|
||||
String responseId = "abcXZY54321";
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
|
||||
.prepareIndex("prelertresults-" + JOB_ID, Bucket.TYPE.getPreferredName(), responseId, captor)
|
||||
.prepareIndex("prelertresults-" + JOB_ID, AnomalyRecord.TYPE.getPreferredName(), "", captor)
|
||||
.prepareIndex("prelertresults-" + JOB_ID, BucketInfluencer.TYPE.getPreferredName(), "", captor)
|
||||
.prepareIndex("prelertresults-" + JOB_ID, Influencer.TYPE.getPreferredName(), "", captor)
|
||||
.prepareBulk(response);
|
||||
|
||||
Client client = clientBuilder.build();
|
||||
Bucket bucket = getBucket(1);
|
||||
Bucket bucket = new Bucket("foo");
|
||||
bucket.setId("1");
|
||||
bucket.setTimestamp(new Date());
|
||||
bucket.setId(responseId);
|
||||
bucket.setAnomalyScore(99.9);
|
||||
bucket.setBucketSpan(123456);
|
||||
|
@ -72,44 +60,15 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
bi.setRawAnomalyScore(19.19);
|
||||
bucket.addBucketInfluencer(bi);
|
||||
|
||||
Influencer inf = new Influencer("jobname", "infName1", "infValue1");
|
||||
inf.setAnomalyScore(16);
|
||||
inf.setId("infID");
|
||||
inf.setInitialAnomalyScore(55.5);
|
||||
inf.setProbability(0.4);
|
||||
inf.setTimestamp(bucket.getTimestamp());
|
||||
bucket.setInfluencers(Collections.singletonList(inf));
|
||||
|
||||
AnomalyRecord record = bucket.getRecords().get(0);
|
||||
List<Double> actuals = new ArrayList<>();
|
||||
actuals.add(5.0);
|
||||
actuals.add(5.1);
|
||||
record.setActual(actuals);
|
||||
// We are adding a record but it shouldn't be persisted as part of the bucket
|
||||
AnomalyRecord record = new AnomalyRecord(JOB_ID);
|
||||
record.setAnomalyScore(99.8);
|
||||
record.setBucketSpan(42);
|
||||
record.setByFieldName("byName");
|
||||
record.setByFieldValue("byValue");
|
||||
record.setCorrelatedByFieldValue("testCorrelations");
|
||||
record.setDetectorIndex(3);
|
||||
record.setFieldName("testFieldName");
|
||||
record.setFunction("testFunction");
|
||||
record.setFunctionDescription("testDescription");
|
||||
record.setInitialNormalizedProbability(23.4);
|
||||
record.setNormalizedProbability(0.005);
|
||||
record.setOverFieldName("overName");
|
||||
record.setOverFieldValue("overValue");
|
||||
record.setPartitionFieldName("partName");
|
||||
record.setPartitionFieldValue("partValue");
|
||||
record.setProbability(0.1);
|
||||
List<Double> typicals = new ArrayList<>();
|
||||
typicals.add(0.44);
|
||||
typicals.add(998765.3);
|
||||
record.setTypical(typicals);
|
||||
bucket.setRecords(Arrays.asList(record));
|
||||
|
||||
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
|
||||
persister.persistBucket(bucket);
|
||||
List<XContentBuilder> list = captor.getAllValues();
|
||||
assertEquals(4, list.size());
|
||||
assertEquals(2, list.size());
|
||||
|
||||
String s = list.get(0).string();
|
||||
assertTrue(s.matches(".*anomalyScore.:99\\.9.*"));
|
||||
|
@ -126,15 +85,50 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
assertTrue(s.matches(".*initialAnomalyScore.:18\\.12.*"));
|
||||
assertTrue(s.matches(".*anomalyScore.:14\\.15.*"));
|
||||
assertTrue(s.matches(".*rawAnomalyScore.:19\\.19.*"));
|
||||
}
|
||||
|
||||
s = list.get(2).string();
|
||||
assertTrue(s.matches(".*probability.:0\\.4.*"));
|
||||
assertTrue(s.matches(".*influencerFieldName.:.infName1.*"));
|
||||
assertTrue(s.matches(".*influencerFieldValue.:.infValue1.*"));
|
||||
assertTrue(s.matches(".*initialAnomalyScore.:55\\.5.*"));
|
||||
assertTrue(s.matches(".*anomalyScore.:16\\.0.*"));
|
||||
public void testPersistRecords() throws IOException {
|
||||
ArgumentCaptor<XContentBuilder> captor = ArgumentCaptor.forClass(XContentBuilder.class);
|
||||
BulkResponse response = mock(BulkResponse.class);
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
|
||||
.prepareIndex("prelertresults-" + JOB_ID, AnomalyRecord.TYPE.getPreferredName(), "", captor)
|
||||
.prepareBulk(response);
|
||||
Client client = clientBuilder.build();
|
||||
|
||||
s = list.get(3).string();
|
||||
List<AnomalyRecord> records = new ArrayList<>();
|
||||
AnomalyRecord r1 = new AnomalyRecord(JOB_ID);
|
||||
records.add(r1);
|
||||
List<Double> actuals = new ArrayList<>();
|
||||
actuals.add(5.0);
|
||||
actuals.add(5.1);
|
||||
r1.setActual(actuals);
|
||||
r1.setAnomalyScore(99.8);
|
||||
r1.setBucketSpan(42);
|
||||
r1.setByFieldName("byName");
|
||||
r1.setByFieldValue("byValue");
|
||||
r1.setCorrelatedByFieldValue("testCorrelations");
|
||||
r1.setDetectorIndex(3);
|
||||
r1.setFieldName("testFieldName");
|
||||
r1.setFunction("testFunction");
|
||||
r1.setFunctionDescription("testDescription");
|
||||
r1.setInitialNormalizedProbability(23.4);
|
||||
r1.setNormalizedProbability(0.005);
|
||||
r1.setOverFieldName("overName");
|
||||
r1.setOverFieldValue("overValue");
|
||||
r1.setPartitionFieldName("partName");
|
||||
r1.setPartitionFieldValue("partValue");
|
||||
r1.setProbability(0.1);
|
||||
List<Double> typicals = new ArrayList<>();
|
||||
typicals.add(0.44);
|
||||
typicals.add(998765.3);
|
||||
r1.setTypical(typicals);
|
||||
|
||||
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
|
||||
persister.persistRecords(records);
|
||||
List<XContentBuilder> captured = captor.getAllValues();
|
||||
assertEquals(1, captured.size());
|
||||
|
||||
String s = captured.get(0).string();
|
||||
assertTrue(s.matches(".*detectorIndex.:3.*"));
|
||||
assertTrue(s.matches(".*\"probability\":0\\.1.*"));
|
||||
assertTrue(s.matches(".*\"anomalyScore\":99\\.8.*"));
|
||||
|
@ -155,18 +149,32 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
assertTrue(s.matches(".*overFieldValue.:.overValue.*"));
|
||||
}
|
||||
|
||||
private Bucket getBucket(int numRecords) {
|
||||
Bucket b = new Bucket(JOB_ID);
|
||||
b.setId("1");
|
||||
b.setTimestamp(new Date());
|
||||
List<AnomalyRecord> records = new ArrayList<>();
|
||||
for (int i = 0; i < numRecords; ++i) {
|
||||
AnomalyRecord r = new AnomalyRecord("foo");
|
||||
records.add(r);
|
||||
}
|
||||
b.setRecords(records);
|
||||
return b;
|
||||
public void testPersistInfluencers() throws IOException {
|
||||
ArgumentCaptor<XContentBuilder> captor = ArgumentCaptor.forClass(XContentBuilder.class);
|
||||
BulkResponse response = mock(BulkResponse.class);
|
||||
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME)
|
||||
.prepareIndex("prelertresults-" + JOB_ID, Influencer.TYPE.getPreferredName(), "", captor)
|
||||
.prepareBulk(response);
|
||||
Client client = clientBuilder.build();
|
||||
|
||||
List<Influencer> influencers = new ArrayList<>();
|
||||
Influencer inf = new Influencer(JOB_ID, "infName1", "infValue1");
|
||||
inf.setAnomalyScore(16);
|
||||
inf.setId("infID");
|
||||
inf.setInitialAnomalyScore(55.5);
|
||||
inf.setProbability(0.4);
|
||||
influencers.add(inf);
|
||||
|
||||
JobResultsPersister persister = new JobResultsPersister(Settings.EMPTY, client);
|
||||
persister.persistInfluencers(influencers);
|
||||
List<XContentBuilder> captured = captor.getAllValues();
|
||||
assertEquals(1, captured.size());
|
||||
|
||||
String s = captured.get(0).string();
|
||||
assertTrue(s.matches(".*probability.:0\\.4.*"));
|
||||
assertTrue(s.matches(".*influencerFieldName.:.infName1.*"));
|
||||
assertTrue(s.matches(".*influencerFieldValue.:.infValue1.*"));
|
||||
assertTrue(s.matches(".*initialAnomalyScore.:55\\.5.*"));
|
||||
assertTrue(s.matches(".*anomalyScore.:16\\.0.*"));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -11,14 +11,18 @@ import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
|
|||
import org.elasticsearch.xpack.prelert.job.persistence.JobResultsPersister;
|
||||
import org.elasticsearch.xpack.prelert.job.process.normalizer.Renormaliser;
|
||||
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
|
||||
import org.elasticsearch.xpack.prelert.job.results.AnomalyRecord;
|
||||
import org.elasticsearch.xpack.prelert.job.results.AutodetectResult;
|
||||
import org.elasticsearch.xpack.prelert.job.results.Bucket;
|
||||
import org.elasticsearch.xpack.prelert.job.results.CategoryDefinition;
|
||||
import org.elasticsearch.xpack.prelert.job.results.Influencer;
|
||||
import org.elasticsearch.xpack.prelert.job.results.ModelDebugOutput;
|
||||
import org.elasticsearch.xpack.prelert.utils.CloseableIterator;
|
||||
import org.mockito.InOrder;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
|
@ -102,6 +106,42 @@ public class AutoDetectResultProcessorTests extends ESTestCase {
|
|||
assertFalse(context.deleteInterimRequired);
|
||||
}
|
||||
|
||||
public void testProcessResult_records() {
|
||||
Renormaliser renormaliser = mock(Renormaliser.class);
|
||||
JobResultsPersister persister = mock(JobResultsPersister.class);
|
||||
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
|
||||
|
||||
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false);
|
||||
context.deleteInterimRequired = false;
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
AnomalyRecord record1 = new AnomalyRecord("foo");
|
||||
AnomalyRecord record2 = new AnomalyRecord("foo");
|
||||
List<AnomalyRecord> records = Arrays.asList(record1, record2);
|
||||
when(result.getRecords()).thenReturn(records);
|
||||
processor.processResult(context, result);
|
||||
|
||||
verify(persister, times(1)).persistRecords(records);
|
||||
verifyNoMoreInteractions(persister);
|
||||
}
|
||||
|
||||
public void testProcessResult_influencers() {
|
||||
Renormaliser renormaliser = mock(Renormaliser.class);
|
||||
JobResultsPersister persister = mock(JobResultsPersister.class);
|
||||
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(renormaliser, persister, null);
|
||||
|
||||
AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context("foo", false);
|
||||
context.deleteInterimRequired = false;
|
||||
AutodetectResult result = mock(AutodetectResult.class);
|
||||
Influencer influencer1 = new Influencer("foo", "infField", "infValue");
|
||||
Influencer influencer2 = new Influencer("foo", "infField2", "infValue2");
|
||||
List<Influencer> influencers = Arrays.asList(influencer1, influencer2);
|
||||
when(result.getInfluencers()).thenReturn(influencers);
|
||||
processor.processResult(context, result);
|
||||
|
||||
verify(persister, times(1)).persistInfluencers(influencers);
|
||||
verifyNoMoreInteractions(persister);
|
||||
}
|
||||
|
||||
public void testProcessResult_categoryDefinition() {
|
||||
Renormaliser renormaliser = mock(Renormaliser.class);
|
||||
JobResultsPersister persister = mock(JobResultsPersister.class);
|
||||
|
|
|
@ -14,7 +14,9 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.output.FlushAcknow
|
|||
import org.elasticsearch.xpack.prelert.job.quantiles.Quantiles;
|
||||
import org.elasticsearch.xpack.prelert.support.AbstractSerializingTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
public class AutodetectResultTests extends AbstractSerializingTestCase<AutodetectResult> {
|
||||
|
||||
|
@ -26,6 +28,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
|||
@Override
|
||||
protected AutodetectResult createTestInstance() {
|
||||
Bucket bucket;
|
||||
List<AnomalyRecord> records = null;
|
||||
List<Influencer> influencers = null;
|
||||
Quantiles quantiles;
|
||||
ModelSnapshot modelSnapshot;
|
||||
ModelSizeStats.Builder modelSizeStats;
|
||||
|
@ -39,6 +43,25 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
|||
} else {
|
||||
bucket = null;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int size = randomInt(10);
|
||||
records = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
AnomalyRecord record = new AnomalyRecord(jobId);
|
||||
record.setProbability(randomDoubleBetween(0.0, 1.0, true));
|
||||
records.add(record);
|
||||
}
|
||||
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int size = randomInt(10);
|
||||
influencers = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
Influencer influencer = new Influencer(jobId, randomAsciiOfLength(10), randomAsciiOfLength(10));
|
||||
influencer.setProbability(randomDoubleBetween(0.0, 1.0, true));
|
||||
influencers.add(influencer);
|
||||
}
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
quantiles = new Quantiles(jobId, new Date(randomLong()), randomAsciiOfLengthBetween(1, 20));
|
||||
} else {
|
||||
|
@ -73,8 +96,8 @@ public class AutodetectResultTests extends AbstractSerializingTestCase<Autodetec
|
|||
} else {
|
||||
flushAcknowledgement = null;
|
||||
}
|
||||
return new AutodetectResult(bucket, quantiles, modelSnapshot, modelSizeStats == null ? null : modelSizeStats.build(),
|
||||
modelDebugOutput, categoryDefinition, flushAcknowledgement);
|
||||
return new AutodetectResult(bucket, records, influencers, quantiles, modelSnapshot,
|
||||
modelSizeStats == null ? null : modelSizeStats.build(), modelDebugOutput, categoryDefinition, flushAcknowledgement);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -51,22 +51,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
|
|||
if (randomBoolean()) {
|
||||
bucket.setId(randomAsciiOfLengthBetween(1, 20));
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
int size = randomInt(10);
|
||||
List<Influencer> influencers = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
Influencer influencer = new Influencer(randomAsciiOfLengthBetween(1, 20), randomAsciiOfLengthBetween(1, 20),
|
||||
randomAsciiOfLengthBetween(1, 20));
|
||||
influencer.setAnomalyScore(randomDouble());
|
||||
influencer.setInitialAnomalyScore(randomDouble());
|
||||
influencer.setProbability(randomDouble());
|
||||
influencer.setId(randomAsciiOfLengthBetween(1, 20));
|
||||
influencer.setInterim(randomBoolean());
|
||||
influencer.setTimestamp(new Date(randomLong()));
|
||||
influencers.add(influencer);
|
||||
}
|
||||
bucket.setInfluencers(influencers);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
bucket.setInitialAnomalyScore(randomDouble());
|
||||
}
|
||||
|
@ -238,16 +222,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
|
|||
assertFalse(bucket2.equals(bucket1));
|
||||
}
|
||||
|
||||
public void testEquals_GivenDifferentInfluencers() {
|
||||
Bucket bucket1 = new Bucket("foo");
|
||||
Influencer influencer = new Influencer("foo", "inf_field", "inf_value");
|
||||
Bucket bucket2 = new Bucket("foo");
|
||||
bucket2.setInfluencers(Arrays.asList(influencer));
|
||||
|
||||
assertFalse(bucket1.equals(bucket2));
|
||||
assertFalse(bucket2.equals(bucket1));
|
||||
}
|
||||
|
||||
public void testEquals_GivenDifferentBucketInfluencers() {
|
||||
Bucket bucket1 = new Bucket("foo");
|
||||
BucketInfluencer influencer1 = new BucketInfluencer("foo");
|
||||
|
@ -266,10 +240,7 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
|
|||
|
||||
public void testEquals_GivenEqualBuckets() {
|
||||
AnomalyRecord record = new AnomalyRecord("jobId");
|
||||
Influencer influencer = new Influencer("jobId", "testField", "testValue");
|
||||
BucketInfluencer bucketInfluencer = new BucketInfluencer("foo");
|
||||
influencer.setProbability(0.1);
|
||||
influencer.setInitialAnomalyScore(10.0);
|
||||
Date date = new Date();
|
||||
|
||||
Bucket bucket1 = new Bucket("foo");
|
||||
|
@ -282,7 +253,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
|
|||
bucket1.setRecordCount(4);
|
||||
bucket1.setRecords(Arrays.asList(record));
|
||||
bucket1.addBucketInfluencer(bucketInfluencer);
|
||||
bucket1.setInfluencers(Arrays.asList(influencer));
|
||||
bucket1.setTimestamp(date);
|
||||
|
||||
Bucket bucket2 = new Bucket("foo");
|
||||
|
@ -295,7 +265,6 @@ public class BucketTests extends AbstractSerializingTestCase<Bucket> {
|
|||
bucket2.setRecordCount(4);
|
||||
bucket2.setRecords(Arrays.asList(record));
|
||||
bucket2.addBucketInfluencer(bucketInfluencer);
|
||||
bucket2.setInfluencers(Arrays.asList(influencer));
|
||||
bucket2.setTimestamp(date);
|
||||
|
||||
assertTrue(bucket1.equals(bucket2));
|
||||
|
|
Loading…
Reference in New Issue