[ML] Align special events with buckets (elastic/x-pack-elasticsearch#3258)

Original commit: elastic/x-pack-elasticsearch@71f9d0fb13
This commit is contained in:
David Kyle 2017-12-11 15:42:06 +00:00 committed by GitHub
parent 5fd68959a0
commit 04c07688a2
8 changed files with 128 additions and 49 deletions

View File

@ -9,6 +9,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
@ -19,6 +20,7 @@ import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Operator;
import org.elasticsearch.xpack.ml.job.config.RuleAction;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.utils.Intervals;
import org.elasticsearch.xpack.ml.utils.time.TimeUtils;
import java.io.IOException;
@ -124,10 +126,27 @@ public class SpecialEvent implements ToXContentObject, Writeable {
return documentId(id);
}
public DetectionRule toDetectionRule() {
/**
* Convert the special event to a detection rule.
* The rule will have 2 time based conditions for the start and
* end of the event.
*
* The rule's start and end times are aligned with the bucket span
* so the start time is rounded down to a bucket interval and the
* end time rounded up.
*
* @param bucketSpan Bucket span to align to
* @return The event as a detection rule.
*/
public DetectionRule toDetectionRule(TimeValue bucketSpan) {
List<RuleCondition> conditions = new ArrayList<>();
conditions.add(RuleCondition.createTime(Operator.GTE, this.getStartTime().toEpochSecond()));
conditions.add(RuleCondition.createTime(Operator.LT, this.getEndTime().toEpochSecond()));
long bucketSpanSecs = bucketSpan.getSeconds();
long bucketStartTime = Intervals.alignToFloor(getStartTime().toEpochSecond(), bucketSpanSecs);
conditions.add(RuleCondition.createTime(Operator.GTE, bucketStartTime));
long bucketEndTime = Intervals.alignToCeil(getEndTime().toEpochSecond(), bucketSpanSecs);
conditions.add(RuleCondition.createTime(Operator.LT, bucketEndTime));
DetectionRule.Builder builder = new DetectionRule.Builder(conditions);
builder.setRuleAction(RuleAction.SKIP_SAMPLING_AND_FILTER_RESULTS);

View File

@ -16,9 +16,9 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
@ -39,6 +39,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
@ -49,6 +51,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class AutodetectCommunicator implements Closeable {
@ -192,17 +195,37 @@ public class AutodetectCommunicator implements Closeable {
autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig());
}
List<DetectionRule> eventsAsRules = Collections.emptyList();
if (specialEvents.isEmpty() == false) {
eventsAsRules = specialEvents.stream()
.map(e -> e.toDetectionRule(job.getAnalysisConfig().getBucketSpan()))
.collect(Collectors.toList());
}
// All detection rules for a detector must be updated together as the update
// wipes any previously set rules.
// Build a single list of rules for special events and detection rules.
List<List<DetectionRule>> rules = new ArrayList<>(job.getAnalysisConfig().getDetectors().size());
for (int i = 0; i < job.getAnalysisConfig().getDetectors().size(); i++) {
List<DetectionRule> detectorRules = new ArrayList<>(eventsAsRules);
rules.add(detectorRules);
}
// Add detector rules
if (updateParams.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) {
if (update.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
rules.get(update.getDetectorIndex()).addAll(update.getRules());
}
}
}
if (updateParams.isUpdateSpecialEvents()) {
autodetectProcess.writeUpdateSpecialEventsMessage(job.getAnalysisConfig().getDetectors().size(), specialEvents);
for (int i = 0; i < job.getAnalysisConfig().getDetectors().size(); i++) {
if (!rules.get(i).isEmpty()) {
autodetectProcess.writeUpdateDetectorRulesMessage(i, rules.get(i));
}
}
return null;
}, handler);
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
@ -75,17 +74,6 @@ public interface AutodetectProcess extends Closeable {
void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules)
throws IOException;
/**
* Write the updated special events overwriting any previous events.
* Writing an empty list of special events removes any previously set events.
*
* @param numberOfDetectors The number of detectors in the job. All will be
* updated with the special events
* @param specialEvents List of events to update
* @throws IOException If the write fails
*/
void writeUpdateSpecialEventsMessage(int numberOfDetectors, List<SpecialEvent> specialEvents) throws IOException;
/**
* Flush the job pushing any stale data into autodetect.
* Every flush command generates a unique flush Id which will be output

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
@ -72,10 +71,6 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess {
public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
}
@Override
public void writeUpdateSpecialEventsMessage(int numberOfDetectors, List<SpecialEvent> specialEvents) throws IOException {
}
/**
* Accept the request do nothing with it but write the flush acknowledgement to {@link #readAutodetectResults()}
* @param params Should interim results be generated

View File

@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
@ -42,7 +41,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* Autodetect process using native code.
@ -161,16 +159,6 @@ class NativeAutodetectProcess implements AutodetectProcess {
writer.writeUpdateDetectorRulesMessage(detectorIndex, rules);
}
@Override
public void writeUpdateSpecialEventsMessage(int numberOfEvents, List<SpecialEvent> specialEvents) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields);
List<DetectionRule> eventsAsRules = specialEvents.stream().map(SpecialEvent::toDetectionRule).collect(Collectors.toList());
for (int i = 0; i < numberOfEvents; i++) {
writer.writeUpdateDetectorRulesMessage(i, eventsAsRules);
}
}
@Override
public String flushJob(FlushJobParams params) throws IOException {
ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfAnalysisFields);

View File

@ -79,8 +79,8 @@ public class FieldConfigWriter {
private void writeDetectors(StringBuilder contents) throws IOException {
int counter = 0;
List<DetectionRule> events = specialEvents.stream().map(SpecialEvent::toDetectionRule).collect(Collectors.toList());
List<DetectionRule> events = specialEvents.stream().map(e -> e.toDetectionRule(config.getBucketSpan()))
.collect(Collectors.toList());
for (Detector detector : config.getDetectors()) {
int detectorId = counter++;

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.calendars;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.ml.job.config.Connective;
@ -25,8 +26,7 @@ import java.util.List;
public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent> {
@Override
protected SpecialEvent createTestInstance() {
public static SpecialEvent createSpecialEvent() {
int size = randomInt(10);
List<String> jobIds = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
@ -39,6 +39,11 @@ public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent>
jobIds);
}
@Override
protected SpecialEvent createTestInstance() {
return createSpecialEvent();
}
@Override
protected Writeable.Reader<SpecialEvent> instanceReader() {
return SpecialEvent::new;
@ -50,8 +55,9 @@ public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent>
}
public void testToDetectionRule() {
long bucketSpanSecs = 300;
SpecialEvent event = createTestInstance();
DetectionRule rule = event.toDetectionRule();
DetectionRule rule = event.toDetectionRule(TimeValue.timeValueSeconds(bucketSpanSecs));
assertEquals(Connective.AND, rule.getConditionsConnective());
assertEquals(RuleAction.SKIP_SAMPLING_AND_FILTER_RESULTS, rule.getRuleAction());
@ -61,10 +67,18 @@ public class SpecialEventTests extends AbstractSerializingTestCase<SpecialEvent>
List<RuleCondition> conditions = rule.getRuleConditions();
assertEquals(2, conditions.size());
assertEquals(RuleConditionType.TIME, conditions.get(0).getConditionType());
assertEquals(Operator.GTE, conditions.get(0).getCondition().getOperator());
assertEquals(event.getStartTime().toEpochSecond(), Long.parseLong(conditions.get(0).getCondition().getValue()));
assertEquals(RuleConditionType.TIME, conditions.get(1).getConditionType());
assertEquals(Operator.GTE, conditions.get(0).getCondition().getOperator());
assertEquals(Operator.LT, conditions.get(1).getCondition().getOperator());
assertEquals(event.getEndTime().toEpochSecond(), Long.parseLong(conditions.get(1).getCondition().getValue()));
// Check times are aligned with the bucket
long conditionStartTime = Long.parseLong(conditions.get(0).getCondition().getValue());
assertEquals(0, conditionStartTime % bucketSpanSecs);
long bucketCount = conditionStartTime / bucketSpanSecs;
assertEquals(bucketSpanSecs * bucketCount, conditionStartTime);
long conditionEndTime = Long.parseLong(conditions.get(1).getCondition().getValue());
assertEquals(0, conditionEndTime % bucketSpanSecs);
assertEquals(bucketSpanSecs * (bucketCount + 1), conditionEndTime);
}
}

View File

@ -10,11 +10,15 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.calendars.SpecialEvent;
import org.elasticsearch.xpack.ml.calendars.SpecialEventTests;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
@ -23,13 +27,17 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@ -48,6 +56,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class AutodetectCommunicatorTests extends ESTestCase {
@ -65,10 +74,52 @@ public class AutodetectCommunicatorTests extends ESTestCase {
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
communicator.writeToJob(new ByteArrayInputStream(new byte[0]),
randomFrom(XContentType.values()), params, (dataCounts, e) -> {});
Mockito.verify(process).writeResetBucketsControlMessage(params);
verify(process).writeResetBucketsControlMessage(params);
}
}
public void testWriteUpdateProcessMessage() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isReady()).thenReturn(true);
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
List<RuleCondition> conditions = Collections.singletonList(
RuleCondition.createCategorical("foo", "bar"));
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(
new JobUpdate.DetectorUpdate(0, "updated description",
Collections.singletonList(new DetectionRule.Builder(conditions).build())));
UpdateParams updateParams = new UpdateParams(null, detectorUpdates, true);
List<SpecialEvent> events = Collections.singletonList(SpecialEventTests.createSpecialEvent());
communicator.writeUpdateProcessMessage(updateParams, events, ((aVoid, e) -> {}));
// There are 2 detectors both will be updated with the rule for the special event.
// The first has an additional update rule
ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
InOrder inOrder = Mockito.inOrder(process);
inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(0), captor.capture());
assertEquals(2, captor.getValue().size());
inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(1), captor.capture());
assertEquals(1, captor.getValue().size());
verify(process).isProcessAlive();
verifyNoMoreInteractions(process);
// This time there is a single detector update and no special events
detectorUpdates = Collections.singletonList(
new JobUpdate.DetectorUpdate(1, "updated description",
Collections.singletonList(new DetectionRule.Builder(conditions).build())));
updateParams = new UpdateParams(null, detectorUpdates, true);
communicator.writeUpdateProcessMessage(updateParams, Collections.emptyList(), ((aVoid, e) -> {}));
inOrder = Mockito.inOrder(process);
inOrder.verify(process).writeUpdateDetectorRulesMessage(eq(1), captor.capture());
assertEquals(1, captor.getValue().size());
verify(process, times(2)).isProcessAlive();
}
public void testFlushJob() throws IOException {
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
when(process.isProcessAlive()).thenReturn(true);
@ -175,9 +226,10 @@ public class AutodetectCommunicatorTests extends ESTestCase {
DataDescription.Builder dd = new DataDescription.Builder();
dd.setTimeField("time_field");
Detector.Builder detector = new Detector.Builder("metric", "value");
detector.setByFieldName("host-metric");
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
Detector.Builder metric = new Detector.Builder("metric", "value");
metric.setByFieldName("host-metric");
Detector.Builder count = new Detector.Builder("count", null);
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Arrays.asList(metric.build(), count.build()));
builder.setDataDescription(dd);
builder.setAnalysisConfig(ac);