Fix rollup on date fields that don't support epoch_millis (#31890)

The rollup indexer uses a range query to select the next page
of results based on the last time bucket of the previous round
and the `delay` configured on the rollup job. This query uses
the `epoch_millis` format implicitly but doesn't set the `format`.
This result in errors during the rollup job if the field
definition doesn't allow this format. It can also miss documents
if the format is not accepted but another format in the field
definition is able to parse the query (e.g.: `epoch_second`).
This change ensures that we use `epoch_millis` as the only format
to parse the rollup range query.
This commit is contained in:
Jim Ferenczi 2018-07-19 09:34:23 +02:00 committed by GitHub
parent 38e2e1d553
commit 644a92f158
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 13 deletions

View File

@ -425,7 +425,8 @@ public abstract class RollupIndexer {
assert lowerBound <= maxBoundary;
final RangeQueryBuilder query = new RangeQueryBuilder(fieldName)
.gte(lowerBound)
.lt(maxBoundary);
.lt(maxBoundary)
.format("epoch_millis");
return query;
}
}

View File

@ -29,6 +29,8 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseSections;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
@ -506,6 +508,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase {
private Map<String, MappedFieldType> createFieldTypes(RollupJobConfig job) {
Map<String, MappedFieldType> fieldTypes = new HashMap<>();
MappedFieldType fieldType = new DateFieldMapper.Builder(job.getGroupConfig().getDateHisto().getField())
.dateTimeFormatter(Joda.forPattern(randomFrom("basic_date", "date_optional_time", "epoch_second")))
.build(new Mapper.BuilderContext(settings.getSettings(), new ContentPath(0)))
.fieldType();
fieldTypes.put(fieldType.name(), fieldType);
@ -618,7 +621,7 @@ public class RollupIndexerIndexingTests extends AggregatorTestCase {
RangeQueryBuilder range = (RangeQueryBuilder) request.source().query();
final DateTimeZone timeZone = range.timeZone() != null ? DateTimeZone.forID(range.timeZone()) : null;
Query query = timestampField.rangeQuery(range.from(), range.to(), range.includeLower(), range.includeUpper(),
null, timeZone, null, queryShardContext);
null, timeZone, new DateMathParser(Joda.forPattern(range.format())), queryShardContext);
// extract composite agg
assertThat(request.source().aggregations().getAggregatorFactories().size(), equalTo(1));

View File

@ -6,12 +6,16 @@
package org.elasticsearch.multi_node;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@ -33,8 +37,8 @@ import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
@ -73,6 +77,31 @@ public class RollupIT extends ESRestTestCase {
public void testBigRollup() throws Exception {
final int numDocs = 200;
String dateFormat = "strict_date_optional_time";
// create the test-index index
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("mappings").startObject("_doc")
.startObject("properties")
.startObject("timestamp")
.field("type", "date")
.field("format", dateFormat)
.endObject()
.startObject("value")
.field("type", "integer")
.endObject()
.endObject()
.endObject().endObject();
}
builder.endObject();
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
Request req = new Request("PUT", "rollup-docs");
req.setEntity(entity);
client().performRequest(req);
}
// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
@ -88,13 +117,15 @@ public class RollupIT extends ESRestTestCase {
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
// create the rollup job
final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test");
int pageSize = randomIntBetween(2, 50);
createRollupJobRequest.setJsonEntity("{"
+ "\"index_pattern\":\"rollup-*\","
+ "\"rollup_index\":\"results-rollup\","
+ "\"cron\":\"*/1 * * * * ?\"," // fast cron and big page size so test runs quickly
+ "\"page_size\":20,"
+ "\"cron\":\"*/1 * * * * ?\"," // fast cron so test runs quickly
+ "\"page_size\":" + pageSize + ","
+ "\"groups\":{"
+ " \"date_histogram\":{"
+ " \"field\":\"timestamp\","
@ -142,7 +173,8 @@ public class RollupIT extends ESRestTestCase {
" \"date_histo\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"timestamp\",\n" +
" \"interval\": \"1h\"\n" +
" \"interval\": \"1h\",\n" +
" \"format\": \"date_time\"\n" +
" },\n" +
" \"aggs\": {\n" +
" \"the_max\": {\n" +
@ -226,7 +258,7 @@ public class RollupIT extends ESRestTestCase {
}
private void waitForRollUpJob(final String rollupJob,String[] expectedStates) throws Exception {
private void waitForRollUpJob(final String rollupJob, String[] expectedStates) throws Exception {
assertBusy(() -> {
final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob);
Response getRollupJobResponse = client().performRequest(getRollupJobRequest);
@ -317,10 +349,4 @@ public class RollupIT extends ESRestTestCase {
}
}
}
private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
}