From 664fcccb8bb238a798fcb4c326d21beb544720f4 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 29 Jan 2018 11:45:04 +0000 Subject: [PATCH] [ML][TEST] Add integ test for categorical rule (elastic/x-pack-elasticsearch#3739) Original commit: elastic/x-pack-elasticsearch@975a10f810f61c2c811cb0977a547e0016083a17 --- .../ml/integration/DetectionRulesIT.java | 111 +++++++++++++++++- .../MlNativeAutodetectIntegTestCase.java | 7 ++ 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java index 8343d7132b2..463c1d1e778 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DetectionRulesIT.java @@ -14,14 +14,17 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.Operator; import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.core.ml.job.config.RuleConditionType; import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.junit.After; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -31,6 +34,8 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isOneOf; /** * An integration test for detection rules @@ -42,7 +47,7 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase { cleanUp(); } - public void test() throws Exception { + public void testNumericalRule() throws Exception { RuleCondition condition1 = RuleCondition.createNumerical( RuleConditionType.NUMERICAL_ACTUAL, "by_field", @@ -143,6 +148,110 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase { assertThat(secondHaldRecordByFieldValues, contains("by_field_value_1", "by_field_value_2")); } + public void testCategoricalRule() throws IOException, InterruptedException { + MlFilter safeIps = new MlFilter("safe_ips", Arrays.asList("111.111.111.111", "222.222.222.222")); + assertThat(putMlFilter(safeIps), is(true)); + + RuleCondition condition = RuleCondition.createCategorical("ip", safeIps.getId()); + DetectionRule rule = new DetectionRule.Builder(Collections.singletonList(condition)).build(); + + Detector.Builder detector = new Detector.Builder("count", null); + detector.setRules(Arrays.asList(rule)); + detector.setOverFieldName("ip"); + + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueHours(1)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + Job.Builder job = new Job.Builder("detection-rule-categorical-test"); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + long timestamp = 1509062400000L; + List data = new ArrayList<>(); + + // Let's send a bunch of random IPs with counts of 1 + for (int bucket = 0; bucket < 20; bucket++) { + for (int i = 0; i < 5; i++) { + data.add(createIpRecord(timestamp, randomAlphaOfLength(10))); + } + timestamp += TimeValue.timeValueHours(1).getMillis(); + } + + // Now send anomalous counts for our filtered IPs plus 333.333.333.333 + List namedIps = Arrays.asList("111.111.111.111", "222.222.222.222", "333.333.333.333"); + long firstAnomalyTime = timestamp; + for (int i = 0; i < 10; i++) { + for (String ip : namedIps) { + data.add(createIpRecord(timestamp, ip)); + } + } + + // Some more normal buckets + for (int bucket = 0; bucket < 3; bucket++) { + for (int i = 0; i < 5; i++) { + data.add(createIpRecord(timestamp, randomAlphaOfLength(10))); + } + timestamp += TimeValue.timeValueHours(1).getMillis(); + } + + postData(job.getId(), joinBetween(0, data.size(), data)); + data = new ArrayList<>(); + flushJob(job.getId(), false); + + List records = getRecords(job.getId()); + assertThat(records.size(), equalTo(1)); + assertThat(records.get(0).getTimestamp().getTime(), equalTo(firstAnomalyTime)); + assertThat(records.get(0).getOverFieldValue(), equalTo("333.333.333.333")); + + // Now let's update the filter + MlFilter updatedFilter = new MlFilter(safeIps.getId(), Collections.singletonList("333.333.333.333")); + assertThat(putMlFilter(updatedFilter), is(true)); + + // We need to give some time for the update to be applied on the autodetect process + Thread.sleep(1000); + + long secondAnomalyTime = timestamp; + // Send another anomalous bucket + for (int i = 0; i < 10; i++) { + for (String ip : namedIps) { + data.add(createIpRecord(timestamp, ip)); + } + } + + // Some more normal buckets + for (int bucket = 0; bucket < 3; bucket++) { + for (int i = 0; i < 5; i++) { + data.add(createIpRecord(timestamp, randomAlphaOfLength(10))); + } + timestamp += TimeValue.timeValueHours(1).getMillis(); + } + + postData(job.getId(), joinBetween(0, data.size(), data)); + flushJob(job.getId(), false); + + GetRecordsAction.Request getRecordsRequest = new GetRecordsAction.Request(job.getId()); + getRecordsRequest.setStart(Long.toString(firstAnomalyTime + 1)); + records = getRecords(getRecordsRequest); + assertThat(records.size(), equalTo(2)); + for (AnomalyRecord record : records) { + assertThat(record.getTimestamp().getTime(), equalTo(secondAnomalyTime)); + assertThat(record.getOverFieldValue(), isOneOf("111.111.111.111", "222.222.222.222")); + } + + closeJob(job.getId()); + } + + private String createIpRecord(long timestamp, String ip) throws IOException { + Map record = new HashMap<>(); + record.put("time", timestamp); + record.put("ip", ip); + return createJsonRecord(record); + } + private String joinBetween(int start, int end, List input) { StringBuilder result = new StringBuilder(); for (int i = start; i < end; i++) { diff --git a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index aa1093722df..9304f765c25 100644 --- a/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -51,6 +51,7 @@ import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.PutFilterAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; @@ -63,6 +64,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; +import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; @@ -408,6 +410,11 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase { return forecasts; } + protected boolean putMlFilter(MlFilter filter) { + PutFilterAction.Response response = client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet(); + return response.isAcknowledged(); + } + @Override protected void ensureClusterStateConsistency() throws IOException { if (cluster() != null && cluster().size() > 0) {