[ML] move DataStreamDiagnostics into DataCountsReporter (elastic/x-pack-elasticsearch#775)

repair DataStreamDiagnostics

Moves DataStreamDiagnostics into DataCountsReporter to survive if job is opened/closed/fed in chunks.

relates elastic/x-pack-elasticsearch#764

Original commit: elastic/x-pack-elasticsearch@29c221a451
This commit is contained in:
Hendrik Muhs 2017-03-23 16:43:51 +01:00 committed by GitHub
parent bb9befcdcb
commit 6f7f466fa3
9 changed files with 388 additions and 183 deletions

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import java.io.Closeable; import java.io.Closeable;
@ -58,7 +59,7 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L); private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L);
private final String jobId; private final Job job;
private final JobDataCountsPersister dataCountsPersister; private final JobDataCountsPersister dataCountsPersister;
private final DataCounts totalRecordStats; private final DataCounts totalRecordStats;
@ -78,16 +79,19 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
private volatile boolean persistDataCountsOnNextRecord; private volatile boolean persistDataCountsOnNextRecord;
private final ThreadPool.Cancellable persistDataCountsDatafeedAction; private final ThreadPool.Cancellable persistDataCountsDatafeedAction;
public DataCountsReporter(ThreadPool threadPool, Settings settings, String jobId, DataCounts counts, private DataStreamDiagnostics diagnostics;
public DataCountsReporter(ThreadPool threadPool, Settings settings, Job job, DataCounts counts,
JobDataCountsPersister dataCountsPersister) { JobDataCountsPersister dataCountsPersister) {
super(settings); super(settings);
this.jobId = jobId; this.job = job;
this.dataCountsPersister = dataCountsPersister; this.dataCountsPersister = dataCountsPersister;
totalRecordStats = counts; totalRecordStats = counts;
incrementalRecordStats = new DataCounts(jobId); incrementalRecordStats = new DataCounts(job.getId());
diagnostics = new DataStreamDiagnostics(job);
acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
@ -105,7 +109,7 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
* @param inputFieldCount Number of fields in the record. * @param inputFieldCount Number of fields in the record.
* Note this is not the number of processed fields (by field etc) * Note this is not the number of processed fields (by field etc)
* but the actual number of fields in the record * but the actual number of fields in the record
* @param recordTimeMs The time of the latest record written * @param recordTimeMs The time of the record written
* in milliseconds from the epoch. * in milliseconds from the epoch.
*/ */
public void reportRecordWritten(long inputFieldCount, long recordTimeMs) { public void reportRecordWritten(long inputFieldCount, long recordTimeMs) {
@ -132,10 +136,14 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
} }
if (persistDataCountsOnNextRecord) { if (persistDataCountsOnNextRecord) {
retrieveDiagnosticsIntermediateResults();
DataCounts copy = new DataCounts(runningTotalStats()); DataCounts copy = new DataCounts(runningTotalStats());
dataCountsPersister.persistDataCounts(jobId, copy, new LoggingActionListener()); dataCountsPersister.persistDataCounts(job.getId(), copy, new LoggingActionListener());
persistDataCountsOnNextRecord = false; persistDataCountsOnNextRecord = false;
} }
diagnostics.checkRecord(recordTimeMs);
} }
/** /**
@ -191,43 +199,6 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
incrementalRecordStats.incrementInputFieldCount(inputFieldCount); incrementalRecordStats.incrementInputFieldCount(inputFieldCount);
} }
public void reportEmptyBucket(long recordTimeMs ){
Date recordDate = new Date(recordTimeMs);
totalRecordStats.incrementEmptyBucketCount(1);
totalRecordStats.setLatestEmptyBucketTimeStamp(recordDate);
incrementalRecordStats.incrementEmptyBucketCount(1);
incrementalRecordStats.setLatestEmptyBucketTimeStamp(recordDate);
}
public void reportEmptyBuckets(long additional, long lastRecordTimeMs ){
Date recordDate = new Date(lastRecordTimeMs);
totalRecordStats.incrementEmptyBucketCount(additional);
totalRecordStats.setLatestEmptyBucketTimeStamp(recordDate);
incrementalRecordStats.incrementEmptyBucketCount(additional);
incrementalRecordStats.setLatestEmptyBucketTimeStamp(recordDate);
}
public void reportSparseBucket(long recordTimeMs){
Date recordDate = new Date(recordTimeMs);
totalRecordStats.incrementSparseBucketCount(1);
totalRecordStats.setLatestSparseBucketTimeStamp(recordDate);
incrementalRecordStats.incrementSparseBucketCount(1);
incrementalRecordStats.setLatestSparseBucketTimeStamp(recordDate);
}
public void reportBucket(){
totalRecordStats.incrementBucketCount(1);
incrementalRecordStats.incrementBucketCount(1);
}
public void reportBuckets(long additional){
totalRecordStats.incrementBucketCount(additional);
incrementalRecordStats.incrementBucketCount(additional);
}
/** /**
* Total records seen = records written to the Engine (processed record * Total records seen = records written to the Engine (processed record
* count) + date parse error records count + out of order record count. * count) + date parse error records count + out of order record count.
@ -315,7 +286,9 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
Date now = new Date(); Date now = new Date();
incrementalRecordStats.setLastDataTimeStamp(now); incrementalRecordStats.setLastDataTimeStamp(now);
totalRecordStats.setLastDataTimeStamp(now); totalRecordStats.setLastDataTimeStamp(now);
dataCountsPersister.persistDataCounts(jobId, runningTotalStats(), new LoggingActionListener()); diagnostics.flush();
retrieveDiagnosticsIntermediateResults();
dataCountsPersister.persistDataCounts(job.getId(), runningTotalStats(), new LoggingActionListener());
} }
/** /**
@ -329,7 +302,7 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
} }
String status = String.format(Locale.ROOT, String status = String.format(Locale.ROOT,
"[%s] %d records written to autodetect; missingFieldCount=%d, invalidDateCount=%d, outOfOrderCount=%d", jobId, "[%s] %d records written to autodetect; missingFieldCount=%d, invalidDateCount=%d, outOfOrderCount=%d", job.getId(),
getProcessedRecordCount(), getMissingFieldErrorCount(), getDateParseErrorsCount(), getOutOfOrderRecordCount()); getProcessedRecordCount(), getMissingFieldErrorCount(), getDateParseErrorsCount(), getOutOfOrderRecordCount());
logger.info(status); logger.info(status);
@ -386,7 +359,9 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
} }
public void startNewIncrementalCount() { public void startNewIncrementalCount() {
incrementalRecordStats = new DataCounts(jobId); incrementalRecordStats = new DataCounts(job.getId());
retrieveDiagnosticsIntermediateResults();
diagnostics.resetCounts();
} }
public DataCounts incrementalStats() { public DataCounts incrementalStats() {
@ -404,18 +379,34 @@ public class DataCountsReporter extends AbstractComponent implements Closeable {
persistDataCountsDatafeedAction.cancel(); persistDataCountsDatafeedAction.cancel();
} }
private void retrieveDiagnosticsIntermediateResults() {
totalRecordStats.incrementBucketCount(diagnostics.getEmptyBucketCount());
totalRecordStats.incrementBucketCount(diagnostics.getBucketCount());
totalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount());
totalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime());
totalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime());
incrementalRecordStats.incrementEmptyBucketCount(diagnostics.getEmptyBucketCount());
incrementalRecordStats.incrementBucketCount(diagnostics.getBucketCount());
incrementalRecordStats.incrementSparseBucketCount(diagnostics.getSparseBucketCount());
incrementalRecordStats.updateLatestEmptyBucketTimeStamp(diagnostics.getLatestEmptyBucketTime());
incrementalRecordStats.updateLatestSparseBucketTimeStamp(diagnostics.getLatestSparseBucketTime());
diagnostics.resetCounts();
}
/** /**
* Log success/error * Log success/error
*/ */
private class LoggingActionListener implements ActionListener<Boolean> { private class LoggingActionListener implements ActionListener<Boolean> {
@Override @Override
public void onResponse(Boolean aBoolean) { public void onResponse(Boolean aBoolean) {
logger.trace("[{}] Persisted DataCounts", jobId); logger.trace("[{}] Persisted DataCounts", job.getId());
} }
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
logger.debug(new ParameterizedMessage("[{}] Error persisting DataCounts stats", jobId), e); logger.debug(new ParameterizedMessage("[{}] Error persisting DataCounts stats", job.getId()), e);
} }
} }
} }

View File

@ -6,9 +6,11 @@
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.apache.lucene.util.Counter; import org.apache.lucene.util.Counter;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.util.Date;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
@ -29,9 +31,7 @@ public class DataStreamDiagnostics {
private static final int DATA_SPARSITY_THRESHOLD = 2; private static final int DATA_SPARSITY_THRESHOLD = 2;
private static final long MS_IN_SECOND = 1000; private static final long MS_IN_SECOND = 1000;
private final DataCountsReporter dataCountsReporter; private static final Logger LOGGER = Loggers.getLogger(DataStreamDiagnostics.class);
private final Logger logger;
/** /**
* Container for the histogram * Container for the histogram
* *
@ -42,19 +42,27 @@ public class DataStreamDiagnostics {
* The container gets pruned along the data streaming based on the bucket * The container gets pruned along the data streaming based on the bucket
* window, so it should not contain more than max(MIN_BUCKET_WINDOW, * window, so it should not contain more than max(MIN_BUCKET_WINDOW,
* 'buckets_required_by_latency') + 1 items at any time. * 'buckets_required_by_latency') + 1 items at any time.
*
* Sparsity can only be calculated after the window has been filled. Currently
* this window is lost if a job gets closed and re-opened. We might fix this
* in future.
*/ */
private final SortedMap<Long, Counter> movingBucketHistogram = new TreeMap<>(); private final SortedMap<Long, Counter> movingBucketHistogram = new TreeMap<>();
private final long bucketSpan; private final long bucketSpan;
private final long latency; private final long latency;
private long movingBucketCount = 0; private long movingBucketCount = 0;
private long lastReportedBucket = -1; private long latestReportedBucket = -1;
public DataStreamDiagnostics(DataCountsReporter dataCountsReporter, AnalysisConfig analysisConfig, Logger logger) { private long bucketCount = 0;
this.dataCountsReporter = dataCountsReporter; private long emptyBucketCount = 0;
this.logger = logger; private long latestEmptyBucketTime = -1;
bucketSpan = analysisConfig.getBucketSpan().seconds(); private long sparseBucketCount = 0;
latency = analysisConfig.getLatency().seconds(); private long latestSparseBucketTime = -1;
public DataStreamDiagnostics(Job job) {
bucketSpan = job.getAnalysisConfig().getBucketSpan().seconds();
latency = job.getAnalysisConfig().getLatency().seconds();
} }
/** /**
@ -92,8 +100,8 @@ public class DataStreamDiagnostics {
++movingBucketCount; ++movingBucketCount;
// find the very first bucket // find the very first bucket
if (lastReportedBucket == -1) { if (latestReportedBucket == -1) {
lastReportedBucket = bucket - 1; latestReportedBucket = bucket - 1;
} }
// flush all bucket out of the window // flush all bucket out of the window
@ -103,17 +111,18 @@ public class DataStreamDiagnostics {
/** /**
* Flush Bucket reporting till the given bucket. * Flush Bucket reporting till the given bucket.
* *
* @param bucketTimeStamp * @param bucketNumber
* The timestamp of the last bucket that can be flushed. * The number of the last bucket that can be flushed.
*/ */
private void flush(long bucketTimeStamp) { private void flush(long bucketNumber) {
// check for a longer period of empty buckets // check for a longer period of empty buckets
long emptyBuckets = movingBucketHistogram.firstKey() - lastReportedBucket - 1; long emptyBuckets = movingBucketHistogram.firstKey() - latestReportedBucket - 1;
if (emptyBuckets > 0) { if (emptyBuckets > 0) {
dataCountsReporter.reportBuckets(emptyBuckets); bucketCount += emptyBuckets;
dataCountsReporter.reportEmptyBuckets(emptyBuckets, (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND); emptyBucketCount += emptyBuckets;
lastReportedBucket = movingBucketHistogram.firstKey() - 1; latestEmptyBucketTime = (movingBucketHistogram.firstKey() - 1) * bucketSpan * MS_IN_SECOND;
latestReportedBucket = movingBucketHistogram.firstKey() - 1;
} }
// calculate the average number of data points in a bucket based on the // calculate the average number of data points in a bucket based on the
@ -121,23 +130,24 @@ public class DataStreamDiagnostics {
double averageBucketSize = (float) movingBucketCount / movingBucketHistogram.size(); double averageBucketSize = (float) movingBucketCount / movingBucketHistogram.size();
// prune all buckets that can be flushed // prune all buckets that can be flushed
long lastBucketSparsityCheck = Math.min(bucketTimeStamp, movingBucketHistogram.lastKey()); long lastBucketSparsityCheck = Math.min(bucketNumber, movingBucketHistogram.lastKey());
for (long pruneBucket = movingBucketHistogram.firstKey(); pruneBucket < lastBucketSparsityCheck; ++pruneBucket) { for (long pruneBucket = movingBucketHistogram.firstKey(); pruneBucket < lastBucketSparsityCheck; ++pruneBucket) {
Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket);
long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L;
logger.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize); LOGGER.debug("Checking bucket {} compare sizes, this bucket: {} average: {}", pruneBucket, bucketSize, averageBucketSize);
dataCountsReporter.reportBucket(); ++bucketCount;
lastReportedBucket = pruneBucket; latestReportedBucket = pruneBucket;
// substract bucketSize from the counter // substract bucketSize from the counter
movingBucketCount -= bucketSize; movingBucketCount -= bucketSize;
// check if bucket is empty // check if bucket is empty
if (bucketSize == 0L) { if (bucketSize == 0L) {
dataCountsReporter.reportEmptyBucket(pruneBucket * bucketSpan * MS_IN_SECOND); latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND;
++emptyBucketCount;
// do not do sparse analysis on an empty bucket // do not do sparse analysis on an empty bucket
continue; continue;
@ -150,28 +160,62 @@ public class DataStreamDiagnostics {
double sparsityScore = logAverageBucketSize - logBucketSize; double sparsityScore = logAverageBucketSize - logBucketSize;
if (sparsityScore > DATA_SPARSITY_THRESHOLD) { if (sparsityScore > DATA_SPARSITY_THRESHOLD) {
logger.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize, LOGGER.debug("Sparse bucket {}, this bucket: {} average: {}, sparsity score: {}", pruneBucket, bucketSize,
averageBucketSize, sparsityScore); averageBucketSize, sparsityScore);
dataCountsReporter.reportSparseBucket(pruneBucket * bucketSpan * MS_IN_SECOND); ++sparseBucketCount;
latestSparseBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND;
} }
} }
// prune the rest if necessary // prune the rest if necessary
for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketTimeStamp; ++pruneBucket) { for (long pruneBucket = lastBucketSparsityCheck; pruneBucket < bucketNumber; ++pruneBucket) {
Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket); Counter bucketSizeHolder = movingBucketHistogram.remove(pruneBucket);
long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L; long bucketSize = bucketSizeHolder != null ? bucketSizeHolder.get() : 0L;
dataCountsReporter.reportBucket(); bucketCount++;
lastReportedBucket = pruneBucket; latestReportedBucket = pruneBucket;
// substract bucketSize from the counter // substract bucketSize from the counter
movingBucketCount -= bucketSize; movingBucketCount -= bucketSize;
// check if bucket is empty // check if bucket is empty
if (bucketSize == 0L) { if (bucketSize == 0L) {
dataCountsReporter.reportEmptyBucket(pruneBucket * bucketSpan * MS_IN_SECOND); latestEmptyBucketTime = pruneBucket * bucketSpan * MS_IN_SECOND;
++emptyBucketCount;
} }
} }
} }
public long getBucketCount() {
return bucketCount;
}
public long getEmptyBucketCount() {
return emptyBucketCount;
}
public Date getLatestEmptyBucketTime() {
return latestEmptyBucketTime > 0 ? new Date(latestEmptyBucketTime) : null;
}
public long getSparseBucketCount() {
return sparseBucketCount;
}
public Date getLatestSparseBucketTime() {
return latestSparseBucketTime > 0 ? new Date(latestSparseBucketTime) : null;
}
/**
* Resets counts,
*
* Note: This does not reset the inner state for e.g. sparse bucket
* detection.
*
*/
public void resetCounts() {
bucketCount = 0;
emptyBucketCount = 0;
sparseBucketCount = 0;
}
} }

View File

@ -256,7 +256,7 @@ public class AutodetectProcessManager extends AbstractComponent {
Job job = jobManager.getJobOrThrowIfUnknown(jobId); Job job = jobManager.getJobOrThrowIfUnknown(jobId);
// A TP with no queue, so that we fail immediately if there are no threads available // A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME); ExecutorService executorService = threadPool.executor(MachineLearning.AUTODETECT_PROCESS_THREAD_POOL_NAME);
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job.getId(), dataCounts, try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, dataCounts,
jobDataCountsPersister)) { jobDataCountsPersister)) {
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client), ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobProvider, new JobRenormalizedResultsPersister(settings, client),
normalizerFactory); normalizerFactory);

View File

@ -442,6 +442,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.latestRecordTimeStamp = latestRecordTimeStamp; this.latestRecordTimeStamp = latestRecordTimeStamp;
} }
public void updateLatestRecordTimeStamp(Date latestRecordTimeStamp) {
if (latestRecordTimeStamp != null &&
(this.latestRecordTimeStamp == null ||
latestRecordTimeStamp.after(this.latestRecordTimeStamp))) {
this.latestRecordTimeStamp = latestRecordTimeStamp;
}
}
/** /**
* The wall clock time the latest record was seen. * The wall clock time the latest record was seen.
* *
@ -468,6 +476,13 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp; this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp;
} }
public void updateLatestEmptyBucketTimeStamp(Date latestEmptyBucketTimeStamp) {
if (latestEmptyBucketTimeStamp != null &&
(this.latestEmptyBucketTimeStamp == null ||
latestEmptyBucketTimeStamp.after(this.latestEmptyBucketTimeStamp))) {
this.latestEmptyBucketTimeStamp = latestEmptyBucketTimeStamp;
}
}
/** /**
* The time of the latest sparse bucket seen. * The time of the latest sparse bucket seen.
@ -482,6 +497,14 @@ public class DataCounts extends ToXContentToBytes implements Writeable {
this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp; this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp;
} }
public void updateLatestSparseBucketTimeStamp(Date latestSparseBucketTimeStamp) {
if (latestSparseBucketTimeStamp != null &&
(this.latestSparseBucketTimeStamp == null ||
latestSparseBucketTimeStamp.after(this.latestSparseBucketTimeStamp))) {
this.latestSparseBucketTimeStamp = latestSparseBucketTimeStamp;
}
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId); out.writeString(jobId);

View File

@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.DataStreamDiagnostics;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import java.io.IOException; import java.io.IOException;
@ -41,7 +40,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
private final Logger logger; private final Logger logger;
private final DateTransformer dateTransformer; private final DateTransformer dateTransformer;
private final DataStreamDiagnostics diagnostics;
private long latencySeconds; private long latencySeconds;
protected Map<String, Integer> inFieldIndexes; protected Map<String, Integer> inFieldIndexes;
@ -60,7 +58,6 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
this.analysisConfig = Objects.requireNonNull(analysisConfig); this.analysisConfig = Objects.requireNonNull(analysisConfig);
this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter); this.dataCountsReporter = Objects.requireNonNull(dataCountsReporter);
this.logger = Objects.requireNonNull(logger); this.logger = Objects.requireNonNull(logger);
this.diagnostics = new DataStreamDiagnostics(this.dataCountsReporter, this.analysisConfig, this.logger);
this.latencySeconds = analysisConfig.getLatency().seconds(); this.latencySeconds = analysisConfig.getLatency().seconds();
Date date = dataCountsReporter.getLatestRecordTime(); Date date = dataCountsReporter.getLatestRecordTime();
@ -158,11 +155,8 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
latestEpochMs = Math.max(latestEpochMs, epochMs); latestEpochMs = Math.max(latestEpochMs, epochMs);
latestEpochMsThisUpload = latestEpochMs; latestEpochMsThisUpload = latestEpochMs;
// check record in diagnostics
diagnostics.checkRecord(epochMs);
autodetectProcess.writeRecord(record); autodetectProcess.writeRecord(record);
dataCountsReporter.reportRecordWritten(numberOfFieldsRead, latestEpochMs); dataCountsReporter.reportRecordWritten(numberOfFieldsRead, epochMs);
return true; return true;
} }

View File

@ -6,9 +6,13 @@
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.junit.Before; import org.junit.Before;
@ -16,6 +20,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -26,10 +31,10 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class DataCountsReporterTests extends ESTestCase { public class DataCountsReporterTests extends ESTestCase {
private static final String JOB_ID = "SR";
private static final int MAX_PERCENT_DATE_PARSE_ERRORS = 40; private static final int MAX_PERCENT_DATE_PARSE_ERRORS = 40;
private static final int MAX_PERCENT_OUT_OF_ORDER_ERRORS = 30; private static final int MAX_PERCENT_OUT_OF_ORDER_ERRORS = 30;
private Job job;
private JobDataCountsPersister jobDataCountsPersister; private JobDataCountsPersister jobDataCountsPersister;
private ThreadPool threadPool; private ThreadPool threadPool;
private Settings settings; private Settings settings;
@ -40,6 +45,17 @@ public class DataCountsReporterTests extends ESTestCase {
.put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS) .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.getKey(), MAX_PERCENT_DATE_PARSE_ERRORS)
.put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS) .put(DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.getKey(), MAX_PERCENT_OUT_OF_ORDER_ERRORS)
.build(); .build();
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(TimeValue.timeValueSeconds(300));
acBuilder.setLatency(TimeValue.ZERO);
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
Job.Builder builder = new Job.Builder("sr");
builder.setAnalysisConfig(acBuilder);
builder.setCreateTime(new Date());
job = builder.build();
jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class); jobDataCountsPersister = Mockito.mock(JobDataCountsPersister.class);
threadPool = Mockito.mock(ThreadPool.class); threadPool = Mockito.mock(ThreadPool.class);
@ -56,7 +72,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testSettingAcceptablePercentages() throws IOException { public void testSettingAcceptablePercentages() throws IOException {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) { jobDataCountsPersister)) {
assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS); assertEquals(dataCountsReporter.getAcceptablePercentDateParseErrors(), MAX_PERCENT_DATE_PARSE_ERRORS);
assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS); assertEquals(dataCountsReporter.getAcceptablePercentOutOfOrderErrors(), MAX_PERCENT_OUT_OF_ORDER_ERRORS);
@ -64,7 +80,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testSimpleConstructor() throws Exception { public void testSimpleConstructor() throws Exception {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) { jobDataCountsPersister)) {
DataCounts stats = dataCountsReporter.incrementalStats(); DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
@ -77,7 +93,7 @@ public class DataCountsReporterTests extends ESTestCase {
new Date(), new Date(), new Date(), new Date(), new Date()); new Date(), new Date(), new Date(), new Date(), new Date());
try (DataCountsReporter dataCountsReporter = try (DataCountsReporter dataCountsReporter =
new DataCountsReporter(threadPool, settings, JOB_ID, counts, jobDataCountsPersister)) { new DataCountsReporter(threadPool, settings, job, counts, jobDataCountsPersister)) {
DataCounts stats = dataCountsReporter.incrementalStats(); DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
@ -95,7 +111,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testResetIncrementalCounts() throws Exception { public void testResetIncrementalCounts() throws Exception {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) { jobDataCountsPersister)) {
DataCounts stats = dataCountsReporter.incrementalStats(); DataCounts stats = dataCountsReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
@ -117,11 +133,33 @@ public class DataCountsReporterTests extends ESTestCase {
stats = dataCountsReporter.incrementalStats(); stats = dataCountsReporter.incrementalStats();
assertNotNull(stats); assertNotNull(stats);
assertAllCountFieldsEqualZero(stats); assertAllCountFieldsEqualZero(stats);
// write some more data
dataCountsReporter.reportRecordWritten(5, 302000);
dataCountsReporter.reportRecordWritten(5, 302000);
assertEquals(2, dataCountsReporter.incrementalStats().getInputRecordCount());
assertEquals(10, dataCountsReporter.incrementalStats().getInputFieldCount());
assertEquals(2, dataCountsReporter.incrementalStats().getProcessedRecordCount());
assertEquals(6, dataCountsReporter.incrementalStats().getProcessedFieldCount());
assertEquals(302000, dataCountsReporter.incrementalStats().getLatestRecordTimeStamp().getTime());
// check total stats
assertEquals(4, dataCountsReporter.runningTotalStats().getInputRecordCount());
assertEquals(20, dataCountsReporter.runningTotalStats().getInputFieldCount());
assertEquals(4, dataCountsReporter.runningTotalStats().getProcessedRecordCount());
assertEquals(12, dataCountsReporter.runningTotalStats().getProcessedFieldCount());
assertEquals(302000, dataCountsReporter.runningTotalStats().getLatestRecordTimeStamp().getTime());
// send 'flush' signal
dataCountsReporter.finishReporting();
assertEquals(2, dataCountsReporter.runningTotalStats().getBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getEmptyBucketCount());
assertEquals(0, dataCountsReporter.runningTotalStats().getSparseBucketCount());
} }
} }
public void testReportLatestTimeIncrementalStats() throws IOException { public void testReportLatestTimeIncrementalStats() throws IOException {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) { jobDataCountsPersister)) {
dataCountsReporter.startNewIncrementalCount(); dataCountsReporter.startNewIncrementalCount();
dataCountsReporter.reportLatestTimeIncrementalStats(5001L); dataCountsReporter.reportLatestTimeIncrementalStats(5001L);
@ -130,7 +168,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testReportRecordsWritten() { public void testReportRecordsWritten() {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) { jobDataCountsPersister)) {
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
@ -248,12 +286,12 @@ public class DataCountsReporterTests extends ESTestCase {
} }
public void testFinishReporting() { public void testFinishReporting() {
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, JOB_ID, new DataCounts(JOB_ID), try (DataCountsReporter dataCountsReporter = new DataCountsReporter(threadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) { jobDataCountsPersister)) {
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
Date now = new Date(); Date now = new Date();
DataCounts dc = new DataCounts(JOB_ID, 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 0L, new Date(2000), new Date(3000), DataCounts dc = new DataCounts(job.getId(), 2L, 5L, 0L, 10L, 0L, 1L, 0L, 0L, 0L, 1L, new Date(2000), new Date(3000),
now, (Date) null, (Date) null); now, (Date) null, (Date) null);
dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000); dataCountsReporter.reportRecordWritten(5, 3000);
@ -267,7 +305,7 @@ public class DataCountsReporterTests extends ESTestCase {
dataCountsReporter.runningTotalStats().getLastDataTimeStamp()); dataCountsReporter.runningTotalStats().getLastDataTimeStamp());
dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp()); dc.setLastDataTimeStamp(dataCountsReporter.incrementalStats().getLastDataTimeStamp());
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), eq(dc), any()); Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), eq(dc), any());
assertEquals(dc, dataCountsReporter.incrementalStats()); assertEquals(dc, dataCountsReporter.incrementalStats());
} }
} }
@ -289,7 +327,7 @@ public class DataCountsReporterTests extends ESTestCase {
} }
}); });
try (DataCountsReporter dataCountsReporter = new DataCountsReporter(mockThreadPool, settings, JOB_ID, new DataCounts(JOB_ID), try (DataCountsReporter dataCountsReporter = new DataCountsReporter(mockThreadPool, settings, job, new DataCounts(job.getId()),
jobDataCountsPersister)) { jobDataCountsPersister)) {
dataCountsReporter.setAnalysedFieldsPerRecord(3); dataCountsReporter.setAnalysedFieldsPerRecord(3);
@ -297,10 +335,10 @@ public class DataCountsReporterTests extends ESTestCase {
dataCountsReporter.reportRecordWritten(5, 2000); dataCountsReporter.reportRecordWritten(5, 2000);
dataCountsReporter.reportRecordWritten(5, 3000); dataCountsReporter.reportRecordWritten(5, 3000);
Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("SR"), any(), any()); Mockito.verify(jobDataCountsPersister, Mockito.times(0)).persistDataCounts(eq("sr"), any(), any());
argumentCaptor.getValue().run(); argumentCaptor.getValue().run();
dataCountsReporter.reportRecordWritten(5, 4000); dataCountsReporter.reportRecordWritten(5, 4000);
Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("SR"), any(), any()); Mockito.verify(jobDataCountsPersister, Mockito.times(1)).persistDataCounts(eq("sr"), any(), any());
} }
} }

View File

@ -5,12 +5,11 @@
*/ */
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
@ -19,21 +18,23 @@ import java.util.Date;
public class DataStreamDiagnosticsTests extends ESTestCase { public class DataStreamDiagnosticsTests extends ESTestCase {
private AnalysisConfig analysisConfig; private Job job;
private Logger logger;
@Before @Before
public void setUpMocks() throws IOException { public void setUpMocks() throws IOException {
logger = Loggers.getLogger(DataStreamDiagnosticsTests.class);
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build())); AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(TimeValue.timeValueSeconds(60)); acBuilder.setBucketSpan(TimeValue.timeValueSeconds(60));
analysisConfig = acBuilder.build(); acBuilder.setLatency(TimeValue.ZERO);
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
Job.Builder builder = new Job.Builder("job_id");
builder.setAnalysisConfig(acBuilder);
builder.setCreateTime(new Date());
job = builder.build();
} }
public void testSimple() { public void testSimple() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
d.checkRecord(70000); d.checkRecord(70000);
d.checkRecord(130000); d.checkRecord(130000);
@ -47,16 +48,15 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
d.checkRecord(610000); d.checkRecord(610000);
d.flush(); d.flush();
assertEquals(10, dataCountsReporter.getBucketCount()); assertEquals(10, d.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount()); assertEquals(0, d.getEmptyBucketCount());
assertEquals(0, dataCountsReporter.getSparseBucketCount()); assertEquals(0, d.getSparseBucketCount());
assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); assertEquals(null, d.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(null, d.getLatestEmptyBucketTime());
} }
public void testEmptyBuckets() { public void testEmptyBuckets() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
d.checkRecord(10000); d.checkRecord(10000);
d.checkRecord(70000); d.checkRecord(70000);
@ -70,16 +70,15 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
d.checkRecord(550000); d.checkRecord(550000);
d.flush(); d.flush();
assertEquals(10, dataCountsReporter.getBucketCount()); assertEquals(10, d.getBucketCount());
assertEquals(2, dataCountsReporter.getEmptyBucketCount()); assertEquals(2, d.getEmptyBucketCount());
assertEquals(0, dataCountsReporter.getSparseBucketCount()); assertEquals(0, d.getSparseBucketCount());
assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); assertEquals(null, d.getLatestSparseBucketTime());
assertEquals(new Date(420000), dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(new Date(420000), d.getLatestEmptyBucketTime());
} }
public void testEmptyBucketsStartLater() { public void testEmptyBucketsStartLater() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
d.checkRecord(1110000); d.checkRecord(1110000);
d.checkRecord(1170000); d.checkRecord(1170000);
@ -93,16 +92,15 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
d.checkRecord(1650000); d.checkRecord(1650000);
d.flush(); d.flush();
assertEquals(10, dataCountsReporter.getBucketCount()); assertEquals(10, d.getBucketCount());
assertEquals(2, dataCountsReporter.getEmptyBucketCount()); assertEquals(2, d.getEmptyBucketCount());
assertEquals(0, dataCountsReporter.getSparseBucketCount()); assertEquals(0, d.getSparseBucketCount());
assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); assertEquals(null, d.getLatestSparseBucketTime());
assertEquals(new Date(1500000), dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(new Date(1500000), d.getLatestEmptyBucketTime());
} }
public void testSparseBuckets() { public void testSparseBuckets() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200); sendManyDataPoints(d, 70000, 129000, 1200);
@ -118,11 +116,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
sendManyDataPoints(d, 550000, 609000, 1400); sendManyDataPoints(d, 550000, 609000, 1400);
d.flush(); d.flush();
assertEquals(10, dataCountsReporter.getBucketCount()); assertEquals(10, d.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount()); assertEquals(0, d.getEmptyBucketCount());
assertEquals(2, dataCountsReporter.getSparseBucketCount()); assertEquals(2, d.getSparseBucketCount());
assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); assertEquals(new Date(420000), d.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(null, d.getLatestEmptyBucketTime());
} }
/** /**
@ -130,8 +128,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
* signal * signal
*/ */
public void testSparseBucketsLast() { public void testSparseBucketsLast() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200); sendManyDataPoints(d, 70000, 129000, 1200);
@ -147,11 +144,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
sendManyDataPoints(d, 550000, 609000, 10); sendManyDataPoints(d, 550000, 609000, 10);
d.flush(); d.flush();
assertEquals(10, dataCountsReporter.getBucketCount()); assertEquals(10, d.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount()); assertEquals(0, d.getEmptyBucketCount());
assertEquals(1, dataCountsReporter.getSparseBucketCount()); assertEquals(1, d.getSparseBucketCount());
assertEquals(new Date(120000), dataCountsReporter.getLatestSparseBucketTime()); assertEquals(new Date(120000), d.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(null, d.getLatestEmptyBucketTime());
} }
/** /**
@ -159,8 +156,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
* signal on the 2nd to last * signal on the 2nd to last
*/ */
public void testSparseBucketsLastTwo() { public void testSparseBucketsLastTwo() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200); sendManyDataPoints(d, 70000, 129000, 1200);
@ -177,16 +173,15 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
sendManyDataPoints(d, 550000, 609000, 10); sendManyDataPoints(d, 550000, 609000, 10);
d.flush(); d.flush();
assertEquals(10, dataCountsReporter.getBucketCount()); assertEquals(10, d.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount()); assertEquals(0, d.getEmptyBucketCount());
assertEquals(2, dataCountsReporter.getSparseBucketCount()); assertEquals(2, d.getSparseBucketCount());
assertEquals(new Date(480000), dataCountsReporter.getLatestSparseBucketTime()); assertEquals(new Date(480000), d.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(null, d.getLatestEmptyBucketTime());
} }
public void testMixedEmptyAndSparseBuckets() { public void testMixedEmptyAndSparseBuckets() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200); sendManyDataPoints(d, 70000, 129000, 1200);
@ -202,11 +197,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
sendManyDataPoints(d, 550000, 609000, 1400); sendManyDataPoints(d, 550000, 609000, 1400);
d.flush(); d.flush();
assertEquals(10, dataCountsReporter.getBucketCount()); assertEquals(10, d.getBucketCount());
assertEquals(2, dataCountsReporter.getSparseBucketCount()); assertEquals(2, d.getSparseBucketCount());
assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); assertEquals(new Date(420000), d.getLatestSparseBucketTime());
assertEquals(2, dataCountsReporter.getEmptyBucketCount()); assertEquals(2, d.getEmptyBucketCount());
assertEquals(new Date(480000), dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(new Date(480000), d.getLatestEmptyBucketTime());
} }
/** /**
@ -214,8 +209,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
* whether counts are right. * whether counts are right.
*/ */
public void testEmptyBucketsLongerOutage() { public void testEmptyBucketsLongerOutage() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
d.checkRecord(10000); d.checkRecord(10000);
d.checkRecord(70000); d.checkRecord(70000);
@ -230,11 +224,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
// 98 empty buckets // 98 empty buckets
d.checkRecord(6490000); d.checkRecord(6490000);
d.flush(); d.flush();
assertEquals(109, dataCountsReporter.getBucketCount()); assertEquals(109, d.getBucketCount());
assertEquals(100, dataCountsReporter.getEmptyBucketCount()); assertEquals(100, d.getEmptyBucketCount());
assertEquals(0, dataCountsReporter.getSparseBucketCount()); assertEquals(0, d.getSparseBucketCount());
assertEquals(null, dataCountsReporter.getLatestSparseBucketTime()); assertEquals(null, d.getLatestSparseBucketTime());
assertEquals(new Date(6420000), dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(new Date(6420000), d.getLatestEmptyBucketTime());
} }
/** /**
@ -243,8 +237,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
* The number of sparse buckets should not be to much, it could be normal. * The number of sparse buckets should not be to much, it could be normal.
*/ */
public void testSparseBucketsLongerPeriod() { public void testSparseBucketsLongerPeriod() {
DataCountsReporter dataCountsReporter = new DummyDataCountsReporter(); DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(dataCountsReporter, analysisConfig, logger);
sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200); sendManyDataPoints(d, 70000, 129000, 1200);
@ -260,11 +253,11 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
sendManyDataPoints(d, 550000, 609000, 1400); sendManyDataPoints(d, 550000, 609000, 1400);
d.flush(); d.flush();
assertEquals(10, dataCountsReporter.getBucketCount()); assertEquals(10, d.getBucketCount());
assertEquals(0, dataCountsReporter.getEmptyBucketCount()); assertEquals(0, d.getEmptyBucketCount());
assertEquals(2, dataCountsReporter.getSparseBucketCount()); assertEquals(2, d.getSparseBucketCount());
assertEquals(new Date(420000), dataCountsReporter.getLatestSparseBucketTime()); assertEquals(new Date(420000), d.getLatestSparseBucketTime());
assertEquals(null, dataCountsReporter.getLatestEmptyBucketTime()); assertEquals(null, d.getLatestEmptyBucketTime());
} }
private void sendManyDataPoints(DataStreamDiagnostics d, long recordTimestampInMsMin, long recordTimestampInMsMax, long howMuch) { private void sendManyDataPoints(DataStreamDiagnostics d, long recordTimestampInMsMin, long recordTimestampInMsMax, long howMuch) {
@ -275,5 +268,4 @@ public class DataStreamDiagnosticsTests extends ESTestCase {
d.checkRecord(recordTimestampInMsMin + i % range); d.checkRecord(recordTimestampInMsMin + i % range);
} }
} }
} }

View File

@ -6,10 +6,17 @@
package org.elasticsearch.xpack.ml.job.process; package org.elasticsearch.xpack.ml.job.process;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import java.util.Arrays;
import java.util.Date;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
/** /**
@ -20,7 +27,7 @@ class DummyDataCountsReporter extends DataCountsReporter {
int logStatusCallCount = 0; int logStatusCallCount = 0;
DummyDataCountsReporter() { DummyDataCountsReporter() {
super(mock(ThreadPool.class), Settings.EMPTY, "DummyJobId", new DataCounts("DummyJobId"), super(mock(ThreadPool.class), Settings.EMPTY, createJob(), new DataCounts("DummyJobId"),
mock(JobDataCountsPersister.class)); mock(JobDataCountsPersister.class));
} }
@ -47,4 +54,17 @@ class DummyDataCountsReporter extends DataCountsReporter {
public void close() { public void close() {
// Do nothing // Do nothing
} }
private static Job createJob() {
AnalysisConfig.Builder acBuilder = new AnalysisConfig.Builder(
Arrays.asList(new Detector.Builder("metric", "field").build()));
acBuilder.setBucketSpan(TimeValue.timeValueSeconds(300));
acBuilder.setLatency(TimeValue.ZERO);
acBuilder.setDetectors(Arrays.asList(new Detector.Builder("metric", "field").build()));
Job.Builder builder = new Job.Builder("dummy_job_id");
builder.setAnalysisConfig(acBuilder);
builder.setCreateTime(new Date());
return builder.build();
}
} }

View File

@ -61,6 +61,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(0, responseBody.get("invalid_date_count")); assertEquals(0, responseBody.get("invalid_date_count"));
assertEquals(0, responseBody.get("missing_field_count")); assertEquals(0, responseBody.get("missing_field_count"));
assertEquals(0, responseBody.get("out_of_order_timestamp_count")); assertEquals(0, responseBody.get("out_of_order_timestamp_count"));
assertEquals(1, responseBody.get("bucket_count"));
assertEquals(1403481600000L, responseBody.get("earliest_record_timestamp")); assertEquals(1403481600000L, responseBody.get("earliest_record_timestamp"));
assertEquals(1403481700000L, responseBody.get("latest_record_timestamp")); assertEquals(1403481700000L, responseBody.get("latest_record_timestamp"));
@ -85,6 +86,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(0, dataCountsDoc.get("invalid_date_count")); assertEquals(0, dataCountsDoc.get("invalid_date_count"));
assertEquals(0, dataCountsDoc.get("missing_field_count")); assertEquals(0, dataCountsDoc.get("missing_field_count"));
assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count")); assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count"));
assertEquals(1, dataCountsDoc.get("bucket_count"));
assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp")); assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp"));
assertEquals(1403481700000L, dataCountsDoc.get("latest_record_timestamp")); assertEquals(1403481700000L, dataCountsDoc.get("latest_record_timestamp"));
@ -160,6 +162,107 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(200, response.getStatusLine().getStatusCode());
} }
public void testMiniFarequoteReopen() throws Exception {
String jobId = "foo1_again";
createFarequoteJob(jobId);
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
assertBusy(this::assertSameClusterStateOnAllNodes);
String postData =
"{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" +
"{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}\n" +
"{\"airline\":\"JBU\",\"responsetime\":\"877.5927\",\"sourcetype\":\"farequote\",\"time\":\"1403481800\"}\n" +
"{\"airline\":\"KLM\",\"responsetime\":\"1355.4812\",\"sourcetype\":\"farequote\",\"time\":\"1403481900\"}\n" +
"{\"airline\":\"NKS\",\"responsetime\":\"9991.3981\",\"sourcetype\":\"farequote\",\"time\":\"1403482000\"}";
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data",
Collections.emptyMap(),
new StringEntity(postData, randomFrom(ContentType.APPLICATION_JSON, ContentType.create("application/x-ndjson"))));
assertEquals(202, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = responseEntityToMap(response);
assertEquals(5, responseBody.get("processed_record_count"));
assertEquals(10, responseBody.get("processed_field_count"));
assertEquals(446, responseBody.get("input_bytes"));
assertEquals(15, responseBody.get("input_field_count"));
assertEquals(0, responseBody.get("invalid_date_count"));
assertEquals(0, responseBody.get("missing_field_count"));
assertEquals(0, responseBody.get("out_of_order_timestamp_count"));
assertEquals(1, responseBody.get("bucket_count"));
assertEquals(1403481600000L, responseBody.get("earliest_record_timestamp"));
assertEquals(1403482000000L, responseBody.get("latest_record_timestamp"));
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_flush");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("flushed", true), responseEntityToMap(response));
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close",
Collections.singletonMap("timeout", "20s"));
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("closed", true), responseEntityToMap(response));
response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open",
Collections.singletonMap("timeout", "20s"));
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
assertBusy(this::assertSameClusterStateOnAllNodes);
// feed some more data points
postData =
"{\"airline\":\"AAL\",\"responsetime\":\"136.2361\",\"sourcetype\":\"farequote\",\"time\":\"1407081600\"}\n" +
"{\"airline\":\"VRD\",\"responsetime\":\"282.9847\",\"sourcetype\":\"farequote\",\"time\":\"1407081700\"}\n" +
"{\"airline\":\"JAL\",\"responsetime\":\"493.0338\",\"sourcetype\":\"farequote\",\"time\":\"1407081800\"}\n" +
"{\"airline\":\"UAL\",\"responsetime\":\"8.4275\",\"sourcetype\":\"farequote\",\"time\":\"1407081900\"}\n" +
"{\"airline\":\"FFT\",\"responsetime\":\"221.8693\",\"sourcetype\":\"farequote\",\"time\":\"1407082000\"}";
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data",
Collections.emptyMap(),
new StringEntity(postData, randomFrom(ContentType.APPLICATION_JSON, ContentType.create("application/x-ndjson"))));
assertEquals(202, response.getStatusLine().getStatusCode());
responseBody = responseEntityToMap(response);
assertEquals(5, responseBody.get("processed_record_count"));
assertEquals(10, responseBody.get("processed_field_count"));
assertEquals(442, responseBody.get("input_bytes"));
assertEquals(15, responseBody.get("input_field_count"));
assertEquals(0, responseBody.get("invalid_date_count"));
assertEquals(0, responseBody.get("missing_field_count"));
assertEquals(0, responseBody.get("out_of_order_timestamp_count"));
assertEquals(1, responseBody.get("bucket_count"));
// unintuitive: should return the earliest record timestamp of this feed???
assertEquals(null, responseBody.get("earliest_record_timestamp"));
assertEquals(1407082000000L, responseBody.get("latest_record_timestamp"));
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close",
Collections.singletonMap("timeout", "20s"));
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("closed", true), responseEntityToMap(response));
// counts should be summed up
response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
assertEquals(200, response.getStatusLine().getStatusCode());
@SuppressWarnings("unchecked")
Map<String, Object> dataCountsDoc = (Map<String, Object>)
((Map)((List) responseEntityToMap(response).get("jobs")).get(0)).get("data_counts");
assertEquals(10, dataCountsDoc.get("processed_record_count"));
assertEquals(20, dataCountsDoc.get("processed_field_count"));
assertEquals(888, dataCountsDoc.get("input_bytes"));
assertEquals(30, dataCountsDoc.get("input_field_count"));
assertEquals(0, dataCountsDoc.get("invalid_date_count"));
assertEquals(0, dataCountsDoc.get("missing_field_count"));
assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count"));
assertEquals(2, dataCountsDoc.get("bucket_count"));
assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp"));
assertEquals(1407082000000L, dataCountsDoc.get("latest_record_timestamp"));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertEquals(200, response.getStatusLine().getStatusCode());
}
private Response createDatafeed(String datafeedId, String jobId) throws Exception { private Response createDatafeed(String datafeedId, String jobId) throws Exception {
XContentBuilder xContentBuilder = jsonBuilder(); XContentBuilder xContentBuilder = jsonBuilder();
xContentBuilder.startObject(); xContentBuilder.startObject();