[ML] Add test to prove categorization state written after lookback (#55297)
When a datafeed transitions from lookback to real-time we request that state is persisted from the autodetect process in the background. This PR adds a test to prove that for a categorization job the state that is persisted includes the categorization state. Without the fix from elastic/ml-cpp#1137 this test fails. After that C++ fix is merged this test should pass. Backport of #55243
This commit is contained in:
parent
2a56a3a1f3
commit
8489f8c121
|
@ -9,15 +9,20 @@ import org.apache.logging.log4j.LogManager;
|
||||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||||
|
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||||
|
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.CategorizerState;
|
||||||
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
|
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
|
||||||
|
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
@ -25,9 +30,12 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
|
||||||
|
import static org.hamcrest.Matchers.arrayWithSize;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.hasKey;
|
||||||
import static org.hamcrest.Matchers.hasSize;
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
|
@ -159,6 +167,58 @@ public class CategorizationIT extends MlNativeAutodetectIntegTestCase {
|
||||||
"Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused by foo exception]")));
|
"Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused by foo exception]")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCategorizationStatePersistedOnSwitchToRealtime() throws Exception {
|
||||||
|
Job.Builder job = newJobBuilder("categorization-swtich-to-realtime", Collections.emptyList());
|
||||||
|
registerJob(job);
|
||||||
|
putJob(job);
|
||||||
|
openJob(job.getId());
|
||||||
|
|
||||||
|
String datafeedId = job.getId() + "-feed";
|
||||||
|
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, job.getId());
|
||||||
|
datafeedConfig.setIndices(Collections.singletonList(DATA_INDEX));
|
||||||
|
DatafeedConfig datafeed = datafeedConfig.build();
|
||||||
|
registerDatafeed(datafeed);
|
||||||
|
putDatafeed(datafeed);
|
||||||
|
startDatafeed(datafeedId, 0, null);
|
||||||
|
|
||||||
|
// When the datafeed switches to realtime the C++ process will be told to persist
|
||||||
|
// state, and this should include the categorizer state. We assert that this exists
|
||||||
|
// before closing the job to prove that it was persisted in the background at the
|
||||||
|
// end of lookback rather than when the job was closed.
|
||||||
|
assertBusy(() -> {
|
||||||
|
SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
|
||||||
|
.setQuery(QueryBuilders.idsQuery().addIds(CategorizerState.documentId(job.getId(), 1)))
|
||||||
|
.get();
|
||||||
|
|
||||||
|
SearchHit[] hits = stateDocsResponse.getHits().getHits();
|
||||||
|
assertThat(hits, arrayWithSize(1));
|
||||||
|
assertThat(hits[0].getSourceAsMap(), hasKey("compressed"));
|
||||||
|
}, 30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
stopDatafeed(datafeedId);
|
||||||
|
closeJob(job.getId());
|
||||||
|
|
||||||
|
List<CategoryDefinition> categories = getCategories(job.getId());
|
||||||
|
assertThat(categories.size(), equalTo(3));
|
||||||
|
|
||||||
|
CategoryDefinition category1 = categories.get(0);
|
||||||
|
assertThat(category1.getRegex(), equalTo(".*?Node.+?started.*"));
|
||||||
|
assertThat(category1.getExamples(),
|
||||||
|
equalTo(Arrays.asList("Node 1 started", "Node 2 started")));
|
||||||
|
|
||||||
|
CategoryDefinition category2 = categories.get(1);
|
||||||
|
assertThat(category2.getRegex(), equalTo(".*?Failed.+?to.+?shutdown.+?error.+?" +
|
||||||
|
"org\\.aaaa\\.bbbb\\.Cccc.+?line.+?caused.+?by.+?foo.+?exception.*"));
|
||||||
|
assertThat(category2.getExamples(), equalTo(Collections.singletonList(
|
||||||
|
"Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused by foo exception]")));
|
||||||
|
|
||||||
|
CategoryDefinition category3 = categories.get(2);
|
||||||
|
assertThat(category3.getRegex(), equalTo(".*?Failed.+?to.+?shutdown.+?error.+?but.+?" +
|
||||||
|
"this.+?time.+?completely.+?different.*"));
|
||||||
|
assertThat(category3.getExamples(), equalTo(Collections.singletonList(
|
||||||
|
"Failed to shutdown [error but this time completely different]")));
|
||||||
|
}
|
||||||
|
|
||||||
public void testCategorizationPerformance() {
|
public void testCategorizationPerformance() {
|
||||||
// To compare Java/C++ tokenization performance:
|
// To compare Java/C++ tokenization performance:
|
||||||
// 1. Change false to true in this assumption
|
// 1. Change false to true in this assumption
|
||||||
|
|
Loading…
Reference in New Issue