[ML][TEST] Add integ test for categorical rule (elastic/x-pack-elasticsearch#3739)
Original commit: elastic/x-pack-elasticsearch@975a10f810
This commit is contained in:
parent
e7b2102126
commit
664fcccb8b
|
@ -14,14 +14,17 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||||
|
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.Operator;
|
import org.elasticsearch.xpack.core.ml.job.config.Operator;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
|
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.RuleConditionType;
|
import org.elasticsearch.xpack.core.ml.job.config.RuleConditionType;
|
||||||
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
|
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -31,6 +34,8 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.contains;
|
import static org.hamcrest.Matchers.contains;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.isOneOf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An integration test for detection rules
|
* An integration test for detection rules
|
||||||
|
@ -42,7 +47,7 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase {
|
||||||
cleanUp();
|
cleanUp();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void test() throws Exception {
|
public void testNumericalRule() throws Exception {
|
||||||
RuleCondition condition1 = RuleCondition.createNumerical(
|
RuleCondition condition1 = RuleCondition.createNumerical(
|
||||||
RuleConditionType.NUMERICAL_ACTUAL,
|
RuleConditionType.NUMERICAL_ACTUAL,
|
||||||
"by_field",
|
"by_field",
|
||||||
|
@ -143,6 +148,110 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase {
|
||||||
assertThat(secondHaldRecordByFieldValues, contains("by_field_value_1", "by_field_value_2"));
|
assertThat(secondHaldRecordByFieldValues, contains("by_field_value_1", "by_field_value_2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCategoricalRule() throws IOException, InterruptedException {
|
||||||
|
MlFilter safeIps = new MlFilter("safe_ips", Arrays.asList("111.111.111.111", "222.222.222.222"));
|
||||||
|
assertThat(putMlFilter(safeIps), is(true));
|
||||||
|
|
||||||
|
RuleCondition condition = RuleCondition.createCategorical("ip", safeIps.getId());
|
||||||
|
DetectionRule rule = new DetectionRule.Builder(Collections.singletonList(condition)).build();
|
||||||
|
|
||||||
|
Detector.Builder detector = new Detector.Builder("count", null);
|
||||||
|
detector.setRules(Arrays.asList(rule));
|
||||||
|
detector.setOverFieldName("ip");
|
||||||
|
|
||||||
|
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
|
||||||
|
analysisConfig.setBucketSpan(TimeValue.timeValueHours(1));
|
||||||
|
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||||
|
Job.Builder job = new Job.Builder("detection-rule-categorical-test");
|
||||||
|
job.setAnalysisConfig(analysisConfig);
|
||||||
|
job.setDataDescription(dataDescription);
|
||||||
|
|
||||||
|
registerJob(job);
|
||||||
|
putJob(job);
|
||||||
|
openJob(job.getId());
|
||||||
|
|
||||||
|
long timestamp = 1509062400000L;
|
||||||
|
List<String> data = new ArrayList<>();
|
||||||
|
|
||||||
|
// Let's send a bunch of random IPs with counts of 1
|
||||||
|
for (int bucket = 0; bucket < 20; bucket++) {
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
data.add(createIpRecord(timestamp, randomAlphaOfLength(10)));
|
||||||
|
}
|
||||||
|
timestamp += TimeValue.timeValueHours(1).getMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now send anomalous counts for our filtered IPs plus 333.333.333.333
|
||||||
|
List<String> namedIps = Arrays.asList("111.111.111.111", "222.222.222.222", "333.333.333.333");
|
||||||
|
long firstAnomalyTime = timestamp;
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
for (String ip : namedIps) {
|
||||||
|
data.add(createIpRecord(timestamp, ip));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Some more normal buckets
|
||||||
|
for (int bucket = 0; bucket < 3; bucket++) {
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
data.add(createIpRecord(timestamp, randomAlphaOfLength(10)));
|
||||||
|
}
|
||||||
|
timestamp += TimeValue.timeValueHours(1).getMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
postData(job.getId(), joinBetween(0, data.size(), data));
|
||||||
|
data = new ArrayList<>();
|
||||||
|
flushJob(job.getId(), false);
|
||||||
|
|
||||||
|
List<AnomalyRecord> records = getRecords(job.getId());
|
||||||
|
assertThat(records.size(), equalTo(1));
|
||||||
|
assertThat(records.get(0).getTimestamp().getTime(), equalTo(firstAnomalyTime));
|
||||||
|
assertThat(records.get(0).getOverFieldValue(), equalTo("333.333.333.333"));
|
||||||
|
|
||||||
|
// Now let's update the filter
|
||||||
|
MlFilter updatedFilter = new MlFilter(safeIps.getId(), Collections.singletonList("333.333.333.333"));
|
||||||
|
assertThat(putMlFilter(updatedFilter), is(true));
|
||||||
|
|
||||||
|
// We need to give some time for the update to be applied on the autodetect process
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
long secondAnomalyTime = timestamp;
|
||||||
|
// Send another anomalous bucket
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
for (String ip : namedIps) {
|
||||||
|
data.add(createIpRecord(timestamp, ip));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Some more normal buckets
|
||||||
|
for (int bucket = 0; bucket < 3; bucket++) {
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
data.add(createIpRecord(timestamp, randomAlphaOfLength(10)));
|
||||||
|
}
|
||||||
|
timestamp += TimeValue.timeValueHours(1).getMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
postData(job.getId(), joinBetween(0, data.size(), data));
|
||||||
|
flushJob(job.getId(), false);
|
||||||
|
|
||||||
|
GetRecordsAction.Request getRecordsRequest = new GetRecordsAction.Request(job.getId());
|
||||||
|
getRecordsRequest.setStart(Long.toString(firstAnomalyTime + 1));
|
||||||
|
records = getRecords(getRecordsRequest);
|
||||||
|
assertThat(records.size(), equalTo(2));
|
||||||
|
for (AnomalyRecord record : records) {
|
||||||
|
assertThat(record.getTimestamp().getTime(), equalTo(secondAnomalyTime));
|
||||||
|
assertThat(record.getOverFieldValue(), isOneOf("111.111.111.111", "222.222.222.222"));
|
||||||
|
}
|
||||||
|
|
||||||
|
closeJob(job.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createIpRecord(long timestamp, String ip) throws IOException {
|
||||||
|
Map<String, Object> record = new HashMap<>();
|
||||||
|
record.put("time", timestamp);
|
||||||
|
record.put("ip", ip);
|
||||||
|
return createJsonRecord(record);
|
||||||
|
}
|
||||||
|
|
||||||
private String joinBetween(int start, int end, List<String> input) {
|
private String joinBetween(int start, int end, List<String> input) {
|
||||||
StringBuilder result = new StringBuilder();
|
StringBuilder result = new StringBuilder();
|
||||||
for (int i = start; i < end; i++) {
|
for (int i = start; i < end; i++) {
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
|
||||||
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
||||||
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
|
import org.elasticsearch.xpack.core.ml.action.PostDataAction;
|
||||||
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
|
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
|
||||||
|
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
|
||||||
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
|
||||||
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
|
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
|
||||||
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
|
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
|
||||||
|
@ -63,6 +64,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||||
|
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
||||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
|
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
|
||||||
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
|
||||||
|
@ -408,6 +410,11 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
|
||||||
return forecasts;
|
return forecasts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean putMlFilter(MlFilter filter) {
|
||||||
|
PutFilterAction.Response response = client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet();
|
||||||
|
return response.isAcknowledged();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void ensureClusterStateConsistency() throws IOException {
|
protected void ensureClusterStateConsistency() throws IOException {
|
||||||
if (cluster() != null && cluster().size() > 0) {
|
if (cluster() != null && cluster().size() > 0) {
|
||||||
|
|
Loading…
Reference in New Issue