From 6f7f466fa3fcb03579023e4042e0fe5c75081408 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 23 Mar 2017 16:43:51 +0100 Subject: [PATCH] [ML] move DataStreamDiagnostics into DataCountsReporter (elastic/x-pack-elasticsearch#775) repair DataStreamDiagnostics Moves DataStreamDiagnostics into DataCountsReporter to survive if job is opened/closed/fed in chunks. relates elastic/x-pack-elasticsearch#764 Original commit: elastic/x-pack-elasticsearch@29c221a451dd6d78de2dcb4d3ee7e926662fa804 --- .../ml/job/process/DataCountsReporter.java | 101 ++++++------- .../ml/job/process/DataStreamDiagnostics.java | 104 ++++++++++---- .../autodetect/AutodetectProcessManager.java | 2 +- .../process/autodetect/state/DataCounts.java | 25 +++- .../writer/AbstractDataToProcessWriter.java | 8 +- .../job/process/DataCountsReporterTests.java | 68 +++++++-- .../process/DataStreamDiagnosticsTests.java | 136 +++++++++--------- .../job/process/DummyDataCountsReporter.java | 24 +++- .../ml/integration/MlBasicMultiNodeIT.java | 103 +++++++++++++ 9 files changed, 388 insertions(+), 183 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index 0b3a966b2d3..d03bbaac885 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import java.io.Closeable; @@ -58,7 +59,7 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L); - private final String jobId; + private final Job job; private final JobDataCountsPersister dataCountsPersister; private final DataCounts totalRecordStats; @@ -78,16 +79,19 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { private volatile boolean persistDataCountsOnNextRecord; private final ThreadPool.Cancellable persistDataCountsDatafeedAction; - public DataCountsReporter(ThreadPool threadPool, Settings settings, String jobId, DataCounts counts, - JobDataCountsPersister dataCountsPersister) { + private DataStreamDiagnostics diagnostics; + + public DataCountsReporter(ThreadPool threadPool, Settings settings, Job job, DataCounts counts, + JobDataCountsPersister dataCountsPersister) { super(settings); - this.jobId = jobId; + this.job = job; this.dataCountsPersister = dataCountsPersister; totalRecordStats = counts; - incrementalRecordStats = new DataCounts(jobId); + incrementalRecordStats = new DataCounts(job.getId()); + diagnostics = new DataStreamDiagnostics(job); acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); @@ -105,7 +109,7 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { * @param inputFieldCount Number of fields in the record. * Note this is not the number of processed fields (by field etc) * but the actual number of fields in the record - * @param recordTimeMs The time of the latest record written + * @param recordTimeMs The time of the record written * in milliseconds from the epoch. */ public void reportRecordWritten(long inputFieldCount, long recordTimeMs) { @@ -132,10 +136,14 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { } if (persistDataCountsOnNextRecord) { + retrieveDiagnosticsIntermediateResults(); + DataCounts copy = new DataCounts(runningTotalStats()); - dataCountsPersister.persistDataCounts(jobId, copy, new LoggingActionListener()); + dataCountsPersister.persistDataCounts(job.getId(), copy, new LoggingActionListener()); persistDataCountsOnNextRecord = false; } + + diagnostics.checkRecord(recordTimeMs); } /** @@ -191,43 +199,6 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { incrementalRecordStats.incrementInputFieldCount(inputFieldCount); } - public void reportEmptyBucket(long recordTimeMs ){ - Date recordDate = new Date(recordTimeMs); - - totalRecordStats.incrementEmptyBucketCount(1); - totalRecordStats.setLatestEmptyBucketTimeStamp(recordDate); - incrementalRecordStats.incrementEmptyBucketCount(1); - incrementalRecordStats.setLatestEmptyBucketTimeStamp(recordDate); - } - - public void reportEmptyBuckets(long additional, long lastRecordTimeMs ){ - Date recordDate = new Date(lastRecordTimeMs); - - totalRecordStats.incrementEmptyBucketCount(additional); - totalRecordStats.setLatestEmptyBucketTimeStamp(recordDate); - incrementalRecordStats.incrementEmptyBucketCount(additional); - incrementalRecordStats.setLatestEmptyBucketTimeStamp(recordDate); - } - - public void reportSparseBucket(long recordTimeMs){ - Date recordDate = new Date(recordTimeMs); - - totalRecordStats.incrementSparseBucketCount(1); - totalRecordStats.setLatestSparseBucketTimeStamp(recordDate); - incrementalRecordStats.incrementSparseBucketCount(1); - incrementalRecordStats.setLatestSparseBucketTimeStamp(recordDate); - } - - public void reportBucket(){ - totalRecordStats.incrementBucketCount(1); - incrementalRecordStats.incrementBucketCount(1); - } - - public void reportBuckets(long additional){ - totalRecordStats.incrementBucketCount(additional); - incrementalRecordStats.incrementBucketCount(additional); - } - /** * Total records seen = records written to the Engine (processed record * count) + date parse error records count + out of order record count. @@ -253,19 +224,19 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { public long getOutOfOrderRecordCount() { return totalRecordStats.getOutOfOrderTimeStampCount(); } - + public long getEmptyBucketCount() { return totalRecordStats.getEmptyBucketCount(); } - + public long getSparseBucketCount() { return totalRecordStats.getSparseBucketCount(); } - + public long getBucketCount() { return totalRecordStats.getBucketCount(); } - + public long getBytesRead() { return totalRecordStats.getInputBytes(); } @@ -273,11 +244,11 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { public Date getLatestRecordTime() { return totalRecordStats.getLatestRecordTimeStamp(); } - + public Date getLatestEmptyBucketTime() { return totalRecordStats.getLatestEmptyBucketTimeStamp(); } - + public Date getLatestSparseBucketTime() { return totalRecordStats.getLatestSparseBucketTimeStamp(); } @@ -315,7 +286,9 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { Date now = new Date(); incrementalRecordStats.setLastDataTimeStamp(now); totalRecordStats.setLastDataTimeStamp(now); - dataCountsPersister.persistDataCounts(jobId, runningTotalStats(), new LoggingActionListener()); + diagnostics.flush(); + retrieveDiagnosticsIntermediateResults(); + dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), new LoggingActionListener()); } /** @@ -329,7 +302,7 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { } String status = String.format(Locale.ROOT, - "[%s] %d records written to autodetect; missingFieldCount=%d, invalidDateCount=%d, outOfOrderCount=%d", jobId, + "[%s] %d records written to autodetect; missingFieldCount=%d, invalidDateCount=%d, outOfOrderCount=%d", job.getId(), getProcessedRecordCount(), getMissingFieldErrorCount(), getDateParseErrorsCount(), getOutOfOrderRecordCount()); logger.info(status); @@ -386,7 +359,9 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { } public void startNewIncrementalCount() { - incrementalRecordStats = new DataCounts(jobId); + incrementalRecordStats = new DataCounts(job.getId()); + retrieveDiagnosticsIntermediateResults(); + diagnostics.resetCounts(); } public DataCounts incrementalStats() { @@ -404,18 +379,34 @@ public class DataCountsReporter extends AbstractComponent implements Closeable { persistDataCountsDatafeedAction.cancel(); } + private void retrieveDiagnosticsIntermediateResults() { + totalRecordStats.incrementBucketCount(diagnostics.getEmptyBucketCount()); + totalRecordStats.incrementBucketCount(diagnostics.getBucketCount()); + totalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount()); + totalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime()); + totalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime()); + + incrementalRecordStats.incrementEmptyBucketCount(diagnostics.getEmptyBucketCount()); + incrementalRecordStats.incrementBucketCount(diagnostics.getBucketCount()); + incrementalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount()); + incrementalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime()); + incrementalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime()); + + diagnostics.resetCounts(); + } + /** * Log success/error */ private class LoggingActionListener implements ActionListener { @Override public void onResponse(Boolean aBoolean) { - logger.trace("[{}] Persisted DataCounts", jobId); + logger.trace("[{}] Persisted DataCounts", job.getId()); } @Override public void onFailure(Exception e) { - logger.debug(new ParameterizedMessage("[{}] Error persisting DataCounts stats", jobId), e); + logger.debug(new ParameterizedMessage("[{}] Error persisting DataCounts stats", job.getId()), e); } } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java index e3ea4d392e0..479aaa44300 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnostics.java @@ -6,9 +6,11 @@ package org.elasticsearch.xpack.ml.job.process; import org.apache.logging.log4j.Logger; -import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.apache.lucene.util.Counter; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.xpack.ml.job.config.Job; +import java.util.Date; import java.util.SortedMap; import java.util.TreeMap; @@ -29,9 +31,7 @@ public class DataStreamDiagnostics { private static final int DATA_SPARSITY_THRESHOLD = 2; private static final long MS_IN_SECOND = 1000; - private final DataCountsReporter dataCountsReporter; - private final Logger logger; - + private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class); /** * Container for the histogram * @@ -42,19 +42,27 @@ public class DataStreamDiagnostics { * The container gets pruned along the data streaming based on the bucket * window, so it should not contain more than max(MIN_BUCKET_WINDOW, * 'buckets_required_by_latency') + 1 items at any time. + * + * Sparsity can only be calculated after the window has been filled. Currently + * this window is lost if a job gets closed and re-opened. We might fix this + * in future. */ private final SortedMap movingBucketHistogram = new TreeMap<>(); private final long bucketSpan; private final long latency; private long movingBucketCount = 0; - private long lastReportedBucket = -1; + private long latestReportedBucket = -1; - public DataStreamDiagnostics(DataCountsReporter dataCountsReporter, AnalysisConfig analysisConfig, Logger logger) { - this.dataCountsReporter = dataCountsReporter; - this.logger = logger; - bucketSpan = analysisConfig.getBucketSpan().seconds(); - latency = analysisConfig.getLatency().seconds(); + private long bucketCount = 0; + private long emptyBucketCount = 0; + private long latestEmptyBucketTime = -1; + private long sparseBucketCount = 0; + private long latestSparseBucketTime = -1; + + public DataStreamDiagnostics(Job job) { + bucketSpan = job.getAnalysisConfig().getBucketSpan().seconds(); + latency = job.getAnalysisConfig().getLatency().seconds(); } /** @@ -92,8 +100,8 @@ public class DataStreamDiagnostics { ++movingBucketCount; // find the very first bucket - if (lastReportedBucket == -1) { - lastReportedBucket = bucket - 1; + if (latestReportedBucket == -1) { + latestReportedBucket = bucket - 1; } // flush all bucket out of the window @@ -103,17 +111,18 @@ public class DataStreamDiagnostics { /** * Flush Bucket reporting till the given bucket. * - * @param bucketTimeStamp - * The timestamp of the last bucket that can be flushed. + * @param bucketNumber + * The number of the last bucket that can be flushed. */ - private void flush(long bucketTimeStamp) { + private void flush(long bucketNumber) { // check for a longer period of empty buckets - long emptyBuckets = movingBucketHistogram.firstKey() - lastReportedBucket - 1; + long emptyBuckets = movingBucketHistogram.firstKey() - latestReportedBucket - 1; if (emptyBuckets > 0) { - dataCountsReporter.reportBuckets(emptyBuckets); - dataCountsReporter.reportEmptyBuckets(emptyBuckets, (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND); - lastReportedBucket = movingBucketHistogram.firstKey() - 1; + bucketCount += emptyBuckets; + emptyBucketCount += emptyBuckets; + latestEmptyBucketTime = (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND; + latestReportedBucket = movingBucketHistogram.firstKey() - 1; } // calculate the average number of data points in a bucket based on the @@ -121,23 +130,24 @@ public class DataStreamDiagnostics { double averageBucketSize = (float) movingBucketCount / movingBucketHistogram.size(); // prune all buckets that can be flushed - long lastBucketSparsityCheck = Math.min(bucketTimeStamp, movingBucketHistogram.lastKey()); + long lastBucketSparsityCheck = Math.min(bucketNumber, movingBucketHistogram.lastKey()); for (long pruneBucket = movingBucketHistogram.firstKey(); pruneBucket < lastBucketSparsityCheck; ++pruneBucket) { Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; - logger.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize); - dataCountsReporter.reportBucket(); - lastReportedBucket = pruneBucket; + LOGGER.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize); + ++bucketCount; + latestReportedBucket = pruneBucket; // substract bucketSize from the counter movingBucketCount -= bucketSize; // check if bucket is empty if (bucketSize == 0L) { - dataCountsReporter.reportEmptyBucket(pruneBucket * bucketSpan * MS_IN_SECOND); + latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; + ++emptyBucketCount; // do not do sparse analysis on an empty bucket continue; @@ -150,28 +160,62 @@ public class DataStreamDiagnostics { double sparsityScore = logAverageBucketSize - logBucketSize; if (sparsityScore > DATA_SPARSITY_THRESHOLD) { - logger.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize, + LOGGER.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize, averageBucketSize, sparsityScore); - dataCountsReporter.reportSparseBucket(pruneBucket * bucketSpan * MS_IN_SECOND); + ++sparseBucketCount; + latestSparseBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; } } // prune the rest if necessary - for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketTimeStamp; ++pruneBucket) { + for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketNumber; ++pruneBucket) { Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; - dataCountsReporter.reportBucket(); - lastReportedBucket = pruneBucket; + bucketCount++; + latestReportedBucket = pruneBucket; // substract bucketSize from the counter movingBucketCount -= bucketSize; // check if bucket is empty if (bucketSize == 0L) { - dataCountsReporter.reportEmptyBucket(pruneBucket * bucketSpan * MS_IN_SECOND); + latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND; + ++emptyBucketCount; } } } + public long getBucketCount() { + return bucketCount; + } + + public long getEmptyBucketCount() { + return emptyBucketCount; + } + + public Date getLatestEmptyBucketTime() { + return latestEmptyBucketTime > 0 ? new Date(latestEmptyBucketTime) : null; + } + + public long getSparseBucketCount() { + return sparseBucketCount; + } + + public Date getLatestSparseBucketTime() { + return latestSparseBucketTime > 0 ? new Date(latestSparseBucketTime) : null; + } + + /** + * Resets counts, + * + * Note: This does not reset the inner state for e.g. sparse bucket + * detection. + * + */ + public void resetCounts() { + bucketCount = 0; + emptyBucketCount = 0; + sparseBucketCount = 0; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 48c3667d1f1..ef5ca2cf19c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -256,7 +256,7 @@ public class AutodetectProcessManager extends AbstractComponent { Job job = jobManager.getJobOrThrowIfUnknown(jobId); // A TP with no queue, so that we fail immediately if there are no threads available ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME); - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), dataCounts, + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, dataCounts, jobDataCountsPersister)) { ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), normalizerFactory); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java index 9c059222eeb..0e695ab6421 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/state/DataCounts.java @@ -442,6 +442,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable { this.latestRecordTimeStamp = latestRecordTimeStamp; } + public void updateLatestRecordTimeStamp(Date latestRecordTimeStamp) { + if (latestRecordTimeStamp != null && + (this.latestRecordTimeStamp == null || + latestRecordTimeStamp.after(this.latestRecordTimeStamp))) { + this.latestRecordTimeStamp = latestRecordTimeStamp; + } + } + /** * The wall clock time the latest record was seen. * @@ -468,7 +476,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable { this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp; } - + public void updateLatestEmptyBucketTimeStamp(Date latestEmptyBucketTimeStamp) { + if (latestEmptyBucketTimeStamp != null && + (this.latestEmptyBucketTimeStamp == null || + latestEmptyBucketTimeStamp.after(this.latestEmptyBucketTimeStamp))) { + this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp; + } + } + /** * The time of the latest sparse bucket seen. * @@ -481,6 +496,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable { public void setLatestSparseBucketTimeStamp(Date latestSparseBucketTimeStamp) { this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp; } + + public void updateLatestSparseBucketTimeStamp(Date latestSparseBucketTimeStamp) { + if (latestSparseBucketTimeStamp != null && + (this.latestSparseBucketTimeStamp == null || + latestSparseBucketTimeStamp.after(this.latestSparseBucketTimeStamp))) { + this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp; + } + } @Override public void writeTo(StreamOutput out) throws IOException { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index ca926d40d2b..e39e439cee3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; -import org.elasticsearch.xpack.ml.job.process.DataStreamDiagnostics; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import java.io.IOException; @@ -41,7 +40,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter private final Logger logger; private final DateTransformer dateTransformer; - private final DataStreamDiagnostics diagnostics; private long latencySeconds; protected Map inFieldIndexes; @@ -60,7 +58,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter this.analysisConfig = Objects.requireNonNull(analysisConfig); this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter); this.logger = Objects.requireNonNull(logger); - this.diagnostics = new DataStreamDiagnostics(this.dataCountsReporter, this.analysisConfig, this.logger); this.latencySeconds = analysisConfig.getLatency().seconds(); Date date = dataCountsReporter.getLatestRecordTime(); @@ -158,11 +155,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter latestEpochMs = Math.max(latestEpochMs, epochMs); latestEpochMsThisUpload = latestEpochMs; - // check record in diagnostics - diagnostics.checkRecord(epochMs); - autodetectProcess.writeRecord(record); - dataCountsReporter.reportRecordWritten(numberOfFieldsRead, latestEpochMs); + dataCountsReporter.reportRecordWritten(numberOfFieldsRead, epochMs); return true; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java index cf691c1f4b5..31f9c230846 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporterTests.java @@ -6,9 +6,13 @@ package org.elasticsearch.xpack.ml.job.process; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.junit.Before; @@ -16,6 +20,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.io.IOException; +import java.util.Arrays; import java.util.Date; import static org.mockito.Matchers.any; @@ -26,20 +31,31 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class DataCountsReporterTests extends ESTestCase { - private static final String JOB_ID = "SR"; private static final int MAX_PERCENT_DATE_PARSE_ERRORS = 40; private static final int MAX_PERCENT_OUT_OF_ORDER_ERRORS = 30; - + + private Job job; private JobDataCountsPersister jobDataCountsPersister; private ThreadPool threadPool; private Settings settings; - + @Before public void setUpMocks() { settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS) .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS) .build(); + + AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); + acBuilder.setBucketSpan(TimeValue.timeValueSeconds(300)); + acBuilder.setLatency(TimeValue.ZERO); + acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + + Job.Builder builder = new Job.Builder("sr"); + builder.setAnalysisConfig(acBuilder); + builder.setCreateTime(new Date()); + job = builder.build(); + jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class); threadPool = Mockito.mock(ThreadPool.class); @@ -56,7 +72,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testSettingAcceptablePercentages() throws IOException { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), jobDataCountsPersister)) { assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS); assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS); @@ -64,7 +80,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testSimpleConstructor() throws Exception { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), jobDataCountsPersister)) { DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); @@ -77,7 +93,7 @@ public class DataCountsReporterTests extends ESTestCase { new Date(), new Date(), new Date(), new Date(), new Date()); try (DataCountsReporter dataCountsReporter = - new DataCountsReporter(threadPool, settings, JOB_ID, counts, jobDataCountsPersister)) { + new DataCountsReporter(threadPool, settings, job, counts, jobDataCountsPersister)) { DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); @@ -95,7 +111,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testResetIncrementalCounts() throws Exception { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), jobDataCountsPersister)) { DataCounts stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); @@ -117,11 +133,33 @@ public class DataCountsReporterTests extends ESTestCase { stats = dataCountsReporter.incrementalStats(); assertNotNull(stats); assertAllCountFieldsEqualZero(stats); + + // write some more data + dataCountsReporter.reportRecordWritten(5, 302000); + dataCountsReporter.reportRecordWritten(5, 302000); + assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount()); + assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount()); + assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount()); + assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount()); + assertEquals(302000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime()); + + // check total stats + assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount()); + assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount()); + assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount()); + assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount()); + assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime()); + + // send 'flush' signal + dataCountsReporter.finishReporting(); + assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount()); + assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount()); + assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount()); } } public void testReportLatestTimeIncrementalStats() throws IOException { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), jobDataCountsPersister)) { dataCountsReporter.startNewIncrementalCount(); dataCountsReporter.reportLatestTimeIncrementalStats(5001L); @@ -130,7 +168,7 @@ public class DataCountsReporterTests extends ESTestCase { } public void testReportRecordsWritten() { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), jobDataCountsPersister)) { dataCountsReporter.setAnalysedFieldsPerRecord(3); @@ -248,12 +286,12 @@ public class DataCountsReporterTests extends ESTestCase { } public void testFinishReporting() { - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()), jobDataCountsPersister)) { dataCountsReporter.setAnalysedFieldsPerRecord(3); Date now = new Date(); - DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 0L, new Date(2000), new Date(3000), + DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 1L, new Date(2000), new Date(3000), now, (Date) null, (Date) null); dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 3000); @@ -267,7 +305,7 @@ public class DataCountsReporterTests extends ESTestCase { dataCountsReporter.runningTotalStats().getLastDataTimeStamp()); dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp()); - Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), eq(dc), any()); + Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), eq(dc), any()); assertEquals(dc, dataCountsReporter.incrementalStats()); } } @@ -289,7 +327,7 @@ public class DataCountsReporterTests extends ESTestCase { } }); - try (DataCountsReporter dataCountsReporter = new DataCountsReporter(mockThreadPool, settings, JOB_ID, new DataCounts(JOB_ID), + try (DataCountsReporter dataCountsReporter = new DataCountsReporter(mockThreadPool, settings, job, new DataCounts(job.getId()), jobDataCountsPersister)) { dataCountsReporter.setAnalysedFieldsPerRecord(3); @@ -297,10 +335,10 @@ public class DataCountsReporterTests extends ESTestCase { dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 3000); - Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("SR"), any(), any()); + Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("sr"), any(), any()); argumentCaptor.getValue().run(); dataCountsReporter.reportRecordWritten(5, 4000); - Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), any(), any()); + Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), any(), any()); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java index 3cc647ac46f..5508bd03d81 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DataStreamDiagnosticsTests.java @@ -5,12 +5,11 @@ */ package org.elasticsearch.xpack.ml.job.process; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; import org.junit.Before; import java.io.IOException; @@ -19,21 +18,23 @@ import java.util.Date; public class DataStreamDiagnosticsTests extends ESTestCase { - private AnalysisConfig analysisConfig; - private Logger logger; - + private Job job; + @Before public void setUpMocks() throws IOException { - logger = Loggers.getLogger(DataStreamDiagnosticsTests.class); - AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); acBuilder.setBucketSpan(TimeValue.timeValueSeconds(60)); - analysisConfig = acBuilder.build(); + acBuilder.setLatency(TimeValue.ZERO); + acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + + Job.Builder builder = new Job.Builder("job_id"); + builder.setAnalysisConfig(acBuilder); + builder.setCreateTime(new Date()); + job = builder.build(); } public void testSimple() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); d.checkRecord(70000); d.checkRecord(130000); @@ -47,16 +48,15 @@ public class DataStreamDiagnosticsTests extends ESTestCase { d.checkRecord(610000); d.flush(); - assertEquals(10, dataCountsReporter.getBucketCount()); - assertEquals(0, dataCountsReporter.getEmptyBucketCount()); - assertEquals(0, dataCountsReporter.getSparseBucketCount()); - assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(10, d.getBucketCount()); + assertEquals(0, d.getEmptyBucketCount()); + assertEquals(0, d.getSparseBucketCount()); + assertEquals(null, d.getLatestSparseBucketTime()); + assertEquals(null, d.getLatestEmptyBucketTime()); } public void testEmptyBuckets() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); d.checkRecord(10000); d.checkRecord(70000); @@ -70,16 +70,15 @@ public class DataStreamDiagnosticsTests extends ESTestCase { d.checkRecord(550000); d.flush(); - assertEquals(10, dataCountsReporter.getBucketCount()); - assertEquals(2, dataCountsReporter.getEmptyBucketCount()); - assertEquals(0, dataCountsReporter.getSparseBucketCount()); - assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(new Date(420000), dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(10, d.getBucketCount()); + assertEquals(2, d.getEmptyBucketCount()); + assertEquals(0, d.getSparseBucketCount()); + assertEquals(null, d.getLatestSparseBucketTime()); + assertEquals(new Date(420000), d.getLatestEmptyBucketTime()); } public void testEmptyBucketsStartLater() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); d.checkRecord(1110000); d.checkRecord(1170000); @@ -93,16 +92,15 @@ public class DataStreamDiagnosticsTests extends ESTestCase { d.checkRecord(1650000); d.flush(); - assertEquals(10, dataCountsReporter.getBucketCount()); - assertEquals(2, dataCountsReporter.getEmptyBucketCount()); - assertEquals(0, dataCountsReporter.getSparseBucketCount()); - assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(new Date(1500000), dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(10, d.getBucketCount()); + assertEquals(2, d.getEmptyBucketCount()); + assertEquals(0, d.getSparseBucketCount()); + assertEquals(null, d.getLatestSparseBucketTime()); + assertEquals(new Date(1500000), d.getLatestEmptyBucketTime()); } public void testSparseBuckets() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -118,11 +116,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase { sendManyDataPoints(d, 550000, 609000, 1400); d.flush(); - assertEquals(10, dataCountsReporter.getBucketCount()); - assertEquals(0, dataCountsReporter.getEmptyBucketCount()); - assertEquals(2, dataCountsReporter.getSparseBucketCount()); - assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(10, d.getBucketCount()); + assertEquals(0, d.getEmptyBucketCount()); + assertEquals(2, d.getSparseBucketCount()); + assertEquals(new Date(420000), d.getLatestSparseBucketTime()); + assertEquals(null, d.getLatestEmptyBucketTime()); } /** @@ -130,8 +128,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { * signal */ public void testSparseBucketsLast() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -147,11 +144,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase { sendManyDataPoints(d, 550000, 609000, 10); d.flush(); - assertEquals(10, dataCountsReporter.getBucketCount()); - assertEquals(0, dataCountsReporter.getEmptyBucketCount()); - assertEquals(1, dataCountsReporter.getSparseBucketCount()); - assertEquals(new Date(120000), dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(10, d.getBucketCount()); + assertEquals(0, d.getEmptyBucketCount()); + assertEquals(1, d.getSparseBucketCount()); + assertEquals(new Date(120000), d.getLatestSparseBucketTime()); + assertEquals(null, d.getLatestEmptyBucketTime()); } /** @@ -159,8 +156,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { * signal on the 2nd to last */ public void testSparseBucketsLastTwo() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -177,16 +173,15 @@ public class DataStreamDiagnosticsTests extends ESTestCase { sendManyDataPoints(d, 550000, 609000, 10); d.flush(); - assertEquals(10, dataCountsReporter.getBucketCount()); - assertEquals(0, dataCountsReporter.getEmptyBucketCount()); - assertEquals(2, dataCountsReporter.getSparseBucketCount()); - assertEquals(new Date(480000), dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(10, d.getBucketCount()); + assertEquals(0, d.getEmptyBucketCount()); + assertEquals(2, d.getSparseBucketCount()); + assertEquals(new Date(480000), d.getLatestSparseBucketTime()); + assertEquals(null, d.getLatestEmptyBucketTime()); } public void testMixedEmptyAndSparseBuckets() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -202,11 +197,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase { sendManyDataPoints(d, 550000, 609000, 1400); d.flush(); - assertEquals(10, dataCountsReporter.getBucketCount()); - assertEquals(2, dataCountsReporter.getSparseBucketCount()); - assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(2, dataCountsReporter.getEmptyBucketCount()); - assertEquals(new Date(480000), dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(10, d.getBucketCount()); + assertEquals(2, d.getSparseBucketCount()); + assertEquals(new Date(420000), d.getLatestSparseBucketTime()); + assertEquals(2, d.getEmptyBucketCount()); + assertEquals(new Date(480000), d.getLatestEmptyBucketTime()); } /** @@ -214,8 +209,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { * whether counts are right. */ public void testEmptyBucketsLongerOutage() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); d.checkRecord(10000); d.checkRecord(70000); @@ -230,11 +224,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase { // 98 empty buckets d.checkRecord(6490000); d.flush(); - assertEquals(109, dataCountsReporter.getBucketCount()); - assertEquals(100, dataCountsReporter.getEmptyBucketCount()); - assertEquals(0, dataCountsReporter.getSparseBucketCount()); - assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(new Date(6420000), dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(109, d.getBucketCount()); + assertEquals(100, d.getEmptyBucketCount()); + assertEquals(0, d.getSparseBucketCount()); + assertEquals(null, d.getLatestSparseBucketTime()); + assertEquals(new Date(6420000), d.getLatestEmptyBucketTime()); } /** @@ -243,8 +237,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { * The number of sparse buckets should not be to much, it could be normal. */ public void testSparseBucketsLongerPeriod() { - DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); - DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger); + DataStreamDiagnostics d = new DataStreamDiagnostics(job); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -260,11 +253,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase { sendManyDataPoints(d, 550000, 609000, 1400); d.flush(); - assertEquals(10, dataCountsReporter.getBucketCount()); - assertEquals(0, dataCountsReporter.getEmptyBucketCount()); - assertEquals(2, dataCountsReporter.getSparseBucketCount()); - assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); - assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); + assertEquals(10, d.getBucketCount()); + assertEquals(0, d.getEmptyBucketCount()); + assertEquals(2, d.getSparseBucketCount()); + assertEquals(new Date(420000), d.getLatestSparseBucketTime()); + assertEquals(null, d.getLatestEmptyBucketTime()); } private void sendManyDataPoints(DataStreamDiagnostics d, long recordTimestampInMsMin, long recordTimestampInMsMax, long howMuch) { @@ -275,5 +268,4 @@ public class DataStreamDiagnosticsTests extends ESTestCase { d.checkRecord(recordTimestampInMsMin + i % range); } } - } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java index 28d91a30289..319f5d72a3a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/DummyDataCountsReporter.java @@ -6,10 +6,17 @@ package org.elasticsearch.xpack.ml.job.process; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.ml.job.config.Detector; +import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import java.util.Arrays; +import java.util.Date; + import static org.mockito.Mockito.mock; /** @@ -19,8 +26,8 @@ class DummyDataCountsReporter extends DataCountsReporter { int logStatusCallCount = 0; - DummyDataCountsReporter() { - super(mock(ThreadPool.class), Settings.EMPTY, "DummyJobId", new DataCounts("DummyJobId"), + DummyDataCountsReporter() { + super(mock(ThreadPool.class), Settings.EMPTY, createJob(), new DataCounts("DummyJobId"), mock(JobDataCountsPersister.class)); } @@ -47,4 +54,17 @@ class DummyDataCountsReporter extends DataCountsReporter { public void close() { // Do nothing } + + private static Job createJob() { + AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder( + Arrays.asList(new Detector.Builder("metric", "field").build())); + acBuilder.setBucketSpan(TimeValue.timeValueSeconds(300)); + acBuilder.setLatency(TimeValue.ZERO); + acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build())); + + Job.Builder builder = new Job.Builder("dummy_job_id"); + builder.setAnalysisConfig(acBuilder); + builder.setCreateTime(new Date()); + return builder.build(); + } } diff --git a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index 6a70ca792dd..fef68106898 100644 --- a/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -61,6 +61,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { assertEquals(0, responseBody.get("invalid_date_count")); assertEquals(0, responseBody.get("missing_field_count")); assertEquals(0, responseBody.get("out_of_order_timestamp_count")); + assertEquals(1, responseBody.get("bucket_count")); assertEquals(1403481600000L, responseBody.get("earliest_record_timestamp")); assertEquals(1403481700000L, responseBody.get("latest_record_timestamp")); @@ -85,6 +86,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { assertEquals(0, dataCountsDoc.get("invalid_date_count")); assertEquals(0, dataCountsDoc.get("missing_field_count")); assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count")); + assertEquals(1, dataCountsDoc.get("bucket_count")); assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp")); assertEquals(1403481700000L, dataCountsDoc.get("latest_record_timestamp")); @@ -160,6 +162,107 @@ public class MlBasicMultiNodeIT extends ESRestTestCase { assertEquals(200, response.getStatusLine().getStatusCode()); } + public void testMiniFarequoteReopen() throws Exception { + String jobId = "foo1_again"; + createFarequoteJob(jobId); + + Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response)); + assertBusy(this::assertSameClusterStateOnAllNodes); + + String postData = + "{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" + + "{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}\n" + + "{\"airline\":\"JBU\",\"responsetime\":\"877.5927\",\"sourcetype\":\"farequote\",\"time\":\"1403481800\"}\n" + + "{\"airline\":\"KLM\",\"responsetime\":\"1355.4812\",\"sourcetype\":\"farequote\",\"time\":\"1403481900\"}\n" + + "{\"airline\":\"NKS\",\"responsetime\":\"9991.3981\",\"sourcetype\":\"farequote\",\"time\":\"1403482000\"}"; + response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data", + Collections.emptyMap(), + new StringEntity(postData, randomFrom(ContentType.APPLICATION_JSON, ContentType.create("application/x-ndjson")))); + assertEquals(202, response.getStatusLine().getStatusCode()); + Map responseBody = responseEntityToMap(response); + assertEquals(5, responseBody.get("processed_record_count")); + assertEquals(10, responseBody.get("processed_field_count")); + assertEquals(446, responseBody.get("input_bytes")); + assertEquals(15, responseBody.get("input_field_count")); + assertEquals(0, responseBody.get("invalid_date_count")); + assertEquals(0, responseBody.get("missing_field_count")); + assertEquals(0, responseBody.get("out_of_order_timestamp_count")); + assertEquals(1, responseBody.get("bucket_count")); + assertEquals(1403481600000L, responseBody.get("earliest_record_timestamp")); + assertEquals(1403482000000L, responseBody.get("latest_record_timestamp")); + + response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_flush"); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(Collections.singletonMap("flushed", true), responseEntityToMap(response)); + + response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close", + Collections.singletonMap("timeout", "20s")); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(Collections.singletonMap("closed", true), responseEntityToMap(response)); + + response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + assertEquals(200, response.getStatusLine().getStatusCode()); + + response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open", + Collections.singletonMap("timeout", "20s")); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response)); + assertBusy(this::assertSameClusterStateOnAllNodes); + + // feed some more data points + postData = + "{\"airline\":\"AAL\",\"responsetime\":\"136.2361\",\"sourcetype\":\"farequote\",\"time\":\"1407081600\"}\n" + + "{\"airline\":\"VRD\",\"responsetime\":\"282.9847\",\"sourcetype\":\"farequote\",\"time\":\"1407081700\"}\n" + + "{\"airline\":\"JAL\",\"responsetime\":\"493.0338\",\"sourcetype\":\"farequote\",\"time\":\"1407081800\"}\n" + + "{\"airline\":\"UAL\",\"responsetime\":\"8.4275\",\"sourcetype\":\"farequote\",\"time\":\"1407081900\"}\n" + + "{\"airline\":\"FFT\",\"responsetime\":\"221.8693\",\"sourcetype\":\"farequote\",\"time\":\"1407082000\"}"; + response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data", + Collections.emptyMap(), + new StringEntity(postData, randomFrom(ContentType.APPLICATION_JSON, ContentType.create("application/x-ndjson")))); + assertEquals(202, response.getStatusLine().getStatusCode()); + responseBody = responseEntityToMap(response); + assertEquals(5, responseBody.get("processed_record_count")); + assertEquals(10, responseBody.get("processed_field_count")); + assertEquals(442, responseBody.get("input_bytes")); + assertEquals(15, responseBody.get("input_field_count")); + assertEquals(0, responseBody.get("invalid_date_count")); + assertEquals(0, responseBody.get("missing_field_count")); + assertEquals(0, responseBody.get("out_of_order_timestamp_count")); + assertEquals(1, responseBody.get("bucket_count")); + + // unintuitive: should return the earliest record timestamp of this feed??? + assertEquals(null, responseBody.get("earliest_record_timestamp")); + assertEquals(1407082000000L, responseBody.get("latest_record_timestamp")); + + response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close", + Collections.singletonMap("timeout", "20s")); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertEquals(Collections.singletonMap("closed", true), responseEntityToMap(response)); + + // counts should be summed up + response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + assertEquals(200, response.getStatusLine().getStatusCode()); + + @SuppressWarnings("unchecked") + Map dataCountsDoc = (Map) + ((Map)((List) responseEntityToMap(response).get("jobs")).get(0)).get("data_counts"); + assertEquals(10, dataCountsDoc.get("processed_record_count")); + assertEquals(20, dataCountsDoc.get("processed_field_count")); + assertEquals(888, dataCountsDoc.get("input_bytes")); + assertEquals(30, dataCountsDoc.get("input_field_count")); + assertEquals(0, dataCountsDoc.get("invalid_date_count")); + assertEquals(0, dataCountsDoc.get("missing_field_count")); + assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count")); + assertEquals(2, dataCountsDoc.get("bucket_count")); + assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp")); + assertEquals(1407082000000L, dataCountsDoc.get("latest_record_timestamp")); + + response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + private Response createDatafeed(String datafeedId, String jobId) throws Exception { XContentBuilder xContentBuilder = jsonBuilder(); xContentBuilder.startObject();