Fix max boundary for rollups job that use a delay (#42158)

Rollup jobs can define how long they should wait before rolling up new documents.
However if the delay is smaller or if it's not a multiple of the rollup interval
the job can create incomplete buckets because the max boundary for a job is computed
from the time when the job started rounded to the interval minus the delay. This change
fixes this computation by applying the delay substraction before the rounding in order to ensure
that we never create a boundary that falls in a middle of a bucket.
This commit is contained in:
Jim Ferenczi 2019-05-20 09:55:35 -04:00 committed by jimczi
parent b0a25c3170
commit ec63160243
2 changed files with 57 additions and 8 deletions

View File

@ -110,15 +110,12 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
@Override
protected void onStart(long now, ActionListener<Void> listener) {
try {
// this is needed to exclude buckets that can still receive new documents.
// this is needed to exclude buckets that can still receive new documents
DateHistogramGroupConfig dateHisto = job.getConfig().getGroupConfig().getDateHistogram();
long rounded = dateHisto.createRounding().round(now);
if (dateHisto.getDelay() != null) {
// if the job has a delay we filter all documents that appear before it.
maxBoundary = rounded - TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis();
} else {
maxBoundary = rounded;
}
// if the job has a delay we filter all documents that appear before it
long delay = dateHisto.getDelay() != null ?
TimeValue.parseTimeValue(dateHisto.getDelay().toString(), "").millis() : 0;
maxBoundary = dateHisto.createRounding().round(now - delay);
listener.onResponse(null);
} catch (Exception e) {
listener.onFailure(e);

View File

@ -328,6 +328,58 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase {
});
}
public void testSimpleDateHistoWithOverlappingDelay() throws Exception {
String rollupIndex = randomAlphaOfLengthBetween(5, 10);
String field = "the_histo";
DateHistogramGroupConfig dateHistoConfig =
new DateHistogramGroupConfig(field, new DateHistogramInterval("1h"), new DateHistogramInterval("15m"), null);
RollupJobConfig job = createJob(rollupIndex, new GroupConfig(dateHistoConfig), Collections.emptyList());
final List<Map<String, Object>> dataset = new ArrayList<>();
long now = asLong("2015-04-01T10:30:00.000Z");
dataset.addAll(
Arrays.asList(
asMap("the_histo", now - TimeValue.timeValueMinutes(135).getMillis()),
asMap("the_histo", now - TimeValue.timeValueMinutes(120).getMillis()),
asMap("the_histo", now - TimeValue.timeValueMinutes(105).getMillis()),
asMap("the_histo", now - TimeValue.timeValueMinutes(90).getMillis()),
asMap("the_histo", now - TimeValue.timeValueMinutes(75).getMillis()),
asMap("the_histo", now - TimeValue.timeValueHours(1).getMillis()),
asMap("the_histo", now - TimeValue.timeValueMinutes(45).getMillis()),
asMap("the_histo", now - TimeValue.timeValueMinutes(30).getMillis()),
asMap("the_histo", now - TimeValue.timeValueMinutes(15).getMillis()),
asMap("the_histo", now)
)
);
final Rounding rounding = dateHistoConfig.createRounding();
executeTestCase(dataset, job, now, (resp) -> {
assertThat(resp.size(), equalTo(2));
IndexRequest request = resp.get(0);
assertThat(request.index(), equalTo(rollupIndex));
assertThat(request.sourceAsMap(), equalTo(
asMap(
"_rollup.version", newIDScheme ? 2 : 1,
"the_histo.date_histogram.timestamp", rounding.round(now - TimeValue.timeValueHours(2).getMillis()),
"the_histo.date_histogram.interval", "1h",
"the_histo.date_histogram._count", 3,
"the_histo.date_histogram.time_zone", DateTimeZone.UTC.toString(),
"_rollup.id", job.getId()
)
));
request = resp.get(1);
assertThat(request.index(), equalTo(rollupIndex));
assertThat(request.sourceAsMap(), equalTo(
asMap(
"_rollup.version", newIDScheme ? 2 : 1,
"the_histo.date_histogram.timestamp", rounding.round(now - TimeValue.timeValueHours(1).getMillis()),
"the_histo.date_histogram.interval", "1h",
"the_histo.date_histogram._count", 4,
"the_histo.date_histogram.time_zone", DateTimeZone.UTC.toString(),
"_rollup.id", job.getId()
)
));
});
}
public void testSimpleDateHistoWithTimeZone() throws Exception {
final List<Map<String, Object>> dataset = new ArrayList<>();
long now = asLong("2015-04-01T10:00:00.000Z");