mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 01:19:02 +00:00
ML: changing automatic check_window calculation (#35643)
* ML: changing automatic check_window calculation * adding docs on how we calculate the default
This commit is contained in:
parent
f70b7dc158
commit
bc7dea4480
@ -72,6 +72,7 @@ The window must be larger than the Job's bucket size, but smaller than 24 hours,
|
||||
and span less than 10,000 buckets.
|
||||
Defaults to `null`, which causes an appropriate window span to be calculated when
|
||||
the datafeed runs.
|
||||
The default `check_window` span calculation is the max between `2h` or `8 * bucket_span`.
|
||||
To explicitly disable, pass `DelayedDataCheckConfig.disabledDelayedDataCheckConfig()`.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
|
@ -111,6 +111,7 @@ The configuration object has the following properties:
|
||||
(time units) The window of time before the latest finalized bucket that should be searched
|
||||
for late data. Defaults to `null` which causes an appropriate `check_window` to be calculated
|
||||
when the real-time {dfeed} runs.
|
||||
The default `check_window` span calculation is the max between `2h` or `8 * bucket_span`.
|
||||
|
||||
[float]
|
||||
[[ml-datafeed-counts]]
|
||||
|
@ -23,11 +23,9 @@ public final class Messages {
|
||||
"script_fields cannot be used in combination with aggregations";
|
||||
public static final String DATAFEED_CONFIG_INVALID_OPTION_VALUE = "Invalid {0} value ''{1}'' in datafeed configuration";
|
||||
public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL =
|
||||
"delayed_data_check_window [{0}] must be greater than the bucket_span [{1}]";
|
||||
public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_LARGE =
|
||||
"delayed_data_check_window [{0}] must be less than or equal to [24h]";
|
||||
"delayed_data_check_config: check_window [{0}] must be greater than the bucket_span [{1}]";
|
||||
public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS =
|
||||
"delayed_data_check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
|
||||
"delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
|
||||
|
||||
public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
|
||||
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
|
||||
|
@ -21,8 +21,8 @@ import java.util.Objects;
|
||||
public class DelayedDataDetectorFactory {
|
||||
|
||||
// There are eight 15min buckets in a two hour span, so matching that number as the fallback for very long buckets
|
||||
private static final int FALLBACK_NUMBER_OF_BUCKETS_TO_SPAN = 8;
|
||||
private static final TimeValue DEFAULT_CHECK_WINDOW = TimeValue.timeValueHours(2);
|
||||
private static final int DEFAULT_NUMBER_OF_BUCKETS_TO_SPAN = 8;
|
||||
private static final long DEFAULT_CHECK_WINDOW_MS = 7_200_000L; // 2 hours in Milliseconds
|
||||
|
||||
/**
|
||||
* This will build the appropriate detector given the parameters.
|
||||
@ -57,11 +57,7 @@ public class DelayedDataDetectorFactory {
|
||||
return 0;
|
||||
}
|
||||
if (currentWindow == null) { // we should provide a good default as the user did not specify a window
|
||||
if(bucketSpan.compareTo(DEFAULT_CHECK_WINDOW) >= 0) {
|
||||
return FALLBACK_NUMBER_OF_BUCKETS_TO_SPAN * bucketSpan.millis();
|
||||
} else {
|
||||
return DEFAULT_CHECK_WINDOW.millis();
|
||||
}
|
||||
return Math.max(DEFAULT_CHECK_WINDOW_MS, DEFAULT_NUMBER_OF_BUCKETS_TO_SPAN * bucketSpan.millis());
|
||||
}
|
||||
if (currentWindow.compareTo(bucketSpan) < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
|
@ -52,13 +52,13 @@ public class DelayedDataDetectorFactoryTests extends ESTestCase {
|
||||
assertEquals(Messages.getMessage(
|
||||
Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS, "12h", "2s"), e.getMessage());
|
||||
|
||||
Job withBigBucketSpan = createJob(TimeValue.timeValueHours(3));
|
||||
Job withBigBucketSpan = createJob(TimeValue.timeValueHours(1));
|
||||
datafeedConfig = createDatafeed(true, null);
|
||||
|
||||
// Should not throw
|
||||
DelayedDataDetector delayedDataDetector =
|
||||
DelayedDataDetectorFactory.buildDetector(withBigBucketSpan, datafeedConfig, mock(Client.class));
|
||||
assertThat(delayedDataDetector.getWindow(), equalTo(TimeValue.timeValueHours(3).millis() * 8));
|
||||
assertThat(delayedDataDetector.getWindow(), equalTo(TimeValue.timeValueHours(1).millis() * 8));
|
||||
|
||||
datafeedConfig = createDatafeed(true, null);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user