[Rollup] Validate timezones based on rules not string comparision (#36237)

The date_histogram internally converts obsolete timezones (such as
"Canada/Mountain") into their modern equivalent ("America/Edmonton").
But rollup just stored the TZ as provided by the user.

When checking the TZ for query validation we used a string comparison,
which would fail due to the date_histo's upgrading behavior.

Instead, we should convert both to a TimeZone object and check if their
rules are compatible.
This commit is contained in:
Zachary Tong 2019-04-17 13:33:51 -04:00 committed by Zachary Tong
parent 4d964194db
commit 7e62ff2823
15 changed files with 645 additions and 158 deletions

View File

@ -66,6 +66,127 @@ public class DateUtils {
DEPRECATED_SHORT_TZ_IDS = tzs.keySet();
}
// Map of deprecated timezones and their recommended new counterpart
public static final Map<String, String> DEPRECATED_LONG_TIMEZONES;
static {
Map<String, String> tzs = new HashMap<>();
tzs.put("Africa/Asmera","Africa/Nairobi");
tzs.put("Africa/Timbuktu","Africa/Abidjan");
tzs.put("America/Argentina/ComodRivadavia","America/Argentina/Catamarca");
tzs.put("America/Atka","America/Adak");
tzs.put("America/Buenos_Aires","America/Argentina/Buenos_Aires");
tzs.put("America/Catamarca","America/Argentina/Catamarca");
tzs.put("America/Coral_Harbour","America/Atikokan");
tzs.put("America/Cordoba","America/Argentina/Cordoba");
tzs.put("America/Ensenada","America/Tijuana");
tzs.put("America/Fort_Wayne","America/Indiana/Indianapolis");
tzs.put("America/Indianapolis","America/Indiana/Indianapolis");
tzs.put("America/Jujuy","America/Argentina/Jujuy");
tzs.put("America/Knox_IN","America/Indiana/Knox");
tzs.put("America/Louisville","America/Kentucky/Louisville");
tzs.put("America/Mendoza","America/Argentina/Mendoza");
tzs.put("America/Montreal","America/Toronto");
tzs.put("America/Porto_Acre","America/Rio_Branco");
tzs.put("America/Rosario","America/Argentina/Cordoba");
tzs.put("America/Santa_Isabel","America/Tijuana");
tzs.put("America/Shiprock","America/Denver");
tzs.put("America/Virgin","America/Port_of_Spain");
tzs.put("Antarctica/South_Pole","Pacific/Auckland");
tzs.put("Asia/Ashkhabad","Asia/Ashgabat");
tzs.put("Asia/Calcutta","Asia/Kolkata");
tzs.put("Asia/Chongqing","Asia/Shanghai");
tzs.put("Asia/Chungking","Asia/Shanghai");
tzs.put("Asia/Dacca","Asia/Dhaka");
tzs.put("Asia/Harbin","Asia/Shanghai");
tzs.put("Asia/Kashgar","Asia/Urumqi");
tzs.put("Asia/Katmandu","Asia/Kathmandu");
tzs.put("Asia/Macao","Asia/Macau");
tzs.put("Asia/Rangoon","Asia/Yangon");
tzs.put("Asia/Saigon","Asia/Ho_Chi_Minh");
tzs.put("Asia/Tel_Aviv","Asia/Jerusalem");
tzs.put("Asia/Thimbu","Asia/Thimphu");
tzs.put("Asia/Ujung_Pandang","Asia/Makassar");
tzs.put("Asia/Ulan_Bator","Asia/Ulaanbaatar");
tzs.put("Atlantic/Faeroe","Atlantic/Faroe");
tzs.put("Atlantic/Jan_Mayen","Europe/Oslo");
tzs.put("Australia/ACT","Australia/Sydney");
tzs.put("Australia/Canberra","Australia/Sydney");
tzs.put("Australia/LHI","Australia/Lord_Howe");
tzs.put("Australia/NSW","Australia/Sydney");
tzs.put("Australia/North","Australia/Darwin");
tzs.put("Australia/Queensland","Australia/Brisbane");
tzs.put("Australia/South","Australia/Adelaide");
tzs.put("Australia/Tasmania","Australia/Hobart");
tzs.put("Australia/Victoria","Australia/Melbourne");
tzs.put("Australia/West","Australia/Perth");
tzs.put("Australia/Yancowinna","Australia/Broken_Hill");
tzs.put("Brazil/Acre","America/Rio_Branco");
tzs.put("Brazil/DeNoronha","America/Noronha");
tzs.put("Brazil/East","America/Sao_Paulo");
tzs.put("Brazil/West","America/Manaus");
tzs.put("Canada/Atlantic","America/Halifax");
tzs.put("Canada/Central","America/Winnipeg");
tzs.put("Canada/East-Saskatchewan","America/Regina");
tzs.put("Canada/Eastern","America/Toronto");
tzs.put("Canada/Mountain","America/Edmonton");
tzs.put("Canada/Newfoundland","America/St_Johns");
tzs.put("Canada/Pacific","America/Vancouver");
tzs.put("Canada/Yukon","America/Whitehorse");
tzs.put("Chile/Continental","America/Santiago");
tzs.put("Chile/EasterIsland","Pacific/Easter");
tzs.put("Cuba","America/Havana");
tzs.put("Egypt","Africa/Cairo");
tzs.put("Eire","Europe/Dublin");
tzs.put("Europe/Belfast","Europe/London");
tzs.put("Europe/Tiraspol","Europe/Chisinau");
tzs.put("GB","Europe/London");
tzs.put("GB-Eire","Europe/London");
tzs.put("Greenwich","Etc/GMT");
tzs.put("Hongkong","Asia/Hong_Kong");
tzs.put("Iceland","Atlantic/Reykjavik");
tzs.put("Iran","Asia/Tehran");
tzs.put("Israel","Asia/Jerusalem");
tzs.put("Jamaica","America/Jamaica");
tzs.put("Japan","Asia/Tokyo");
tzs.put("Kwajalein","Pacific/Kwajalein");
tzs.put("Libya","Africa/Tripoli");
tzs.put("Mexico/BajaNorte","America/Tijuana");
tzs.put("Mexico/BajaSur","America/Mazatlan");
tzs.put("Mexico/General","America/Mexico_City");
tzs.put("NZ","Pacific/Auckland");
tzs.put("NZ-CHAT","Pacific/Chatham");
tzs.put("Navajo","America/Denver");
tzs.put("PRC","Asia/Shanghai");
tzs.put("Pacific/Johnston","Pacific/Honolulu");
tzs.put("Pacific/Ponape","Pacific/Pohnpei");
tzs.put("Pacific/Samoa","Pacific/Pago_Pago");
tzs.put("Pacific/Truk","Pacific/Chuuk");
tzs.put("Pacific/Yap","Pacific/Chuuk");
tzs.put("Poland","Europe/Warsaw");
tzs.put("Portugal","Europe/Lisbon");
tzs.put("ROC","Asia/Taipei");
tzs.put("ROK","Asia/Seoul");
tzs.put("Singapore","Asia/Singapore");
tzs.put("Turkey","Europe/Istanbul");
tzs.put("UCT","Etc/UCT");
tzs.put("US/Alaska","America/Anchorage");
tzs.put("US/Aleutian","America/Adak");
tzs.put("US/Arizona","America/Phoenix");
tzs.put("US/Central","America/Chicago");
tzs.put("US/East-Indiana","America/Indiana/Indianapolis");
tzs.put("US/Eastern","America/New_York");
tzs.put("US/Hawaii","Pacific/Honolulu");
tzs.put("US/Indiana-Starke","America/Indiana/Knox");
tzs.put("US/Michigan","America/Detroit");
tzs.put("US/Mountain","America/Denver");
tzs.put("US/Pacific","America/Los_Angeles");
tzs.put("US/Samoa","Pacific/Pago_Pago");
tzs.put("Universal","Etc/UTC");
tzs.put("W-SU","Europe/Moscow");
tzs.put("Zulu","Etc/UTC");
DEPRECATED_LONG_TIMEZONES = Collections.unmodifiableMap(tzs);
}
public static ZoneId dateTimeZoneToZoneId(DateTimeZone timeZone) {
if (timeZone == null) {
return null;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
import java.io.IOException;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.Objects;
@ -52,7 +53,8 @@ public class DateHistogramGroupConfig implements Writeable, ToXContentObject {
private static final String FIELD = "field";
public static final String TIME_ZONE = "time_zone";
public static final String DELAY = "delay";
private static final String DEFAULT_TIMEZONE = "UTC";
public static final String DEFAULT_TIMEZONE = "UTC";
public static final ZoneId DEFAULT_ZONEID_TIMEZONE = ZoneOffset.UTC;
private static final ConstructingObjectParser<DateHistogramGroupConfig, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(NAME, a ->
@ -210,12 +212,12 @@ public class DateHistogramGroupConfig implements Writeable, ToXContentObject {
return Objects.equals(interval, that.interval)
&& Objects.equals(field, that.field)
&& Objects.equals(delay, that.delay)
&& Objects.equals(timeZone, that.timeZone);
&& ZoneId.of(timeZone, ZoneId.SHORT_IDS).getRules().equals(ZoneId.of(that.timeZone, ZoneId.SHORT_IDS).getRules());
}
@Override
public int hashCode() {
return Objects.hash(interval, field, delay, timeZone);
return Objects.hash(interval, field, delay, ZoneId.of(timeZone));
}
@Override
@ -235,7 +237,7 @@ public class DateHistogramGroupConfig implements Writeable, ToXContentObject {
} else {
rounding = new Rounding.Builder(TimeValue.parseTimeValue(expr, "createRounding"));
}
rounding.timeZone(ZoneId.of(timeZone));
rounding.timeZone(ZoneId.of(timeZone, ZoneId.SHORT_IDS));
return rounding.build();
}
}

View File

@ -23,6 +23,8 @@ import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps.RollupFieldCaps;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -124,7 +126,7 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
if (rollupJobGroupConfig.hasDatehistogram() == false) {
return false;
}
if ("UTC".equalsIgnoreCase(rollupJobGroupConfig.getTimezone()) == false) {
if (ZoneId.of(rollupJobGroupConfig.getTimezone()).getRules().equals(ZoneOffset.UTC.getRules()) == false) {
return false;
}
try {

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
@ -96,11 +97,13 @@ public class RollupJobIdentifierUtils {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
DateHistogramInterval interval = new DateHistogramInterval((String)agg.get(RollupField.INTERVAL));
String thisTimezone = (String)agg.get(DateHistogramGroupConfig.TIME_ZONE);
String sourceTimeZone = source.timeZone() == null ? "UTC" : source.timeZone().toString();
ZoneId thisTimezone = ZoneId.of(((String) agg.get(DateHistogramGroupConfig.TIME_ZONE)), ZoneId.SHORT_IDS);
ZoneId sourceTimeZone = source.timeZone() == null
? DateHistogramGroupConfig.DEFAULT_ZONEID_TIMEZONE
: ZoneId.of(source.timeZone().toString(), ZoneId.SHORT_IDS);
// Ensure we are working on the same timezone
if (thisTimezone.equalsIgnoreCase(sourceTimeZone) == false) {
if (thisTimezone.getRules().equals(sourceTimeZone.getRules()) == false) {
continue;
}
if (source.dateHistogramInterval() != null) {

View File

@ -11,8 +11,6 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
@ -22,8 +20,8 @@ import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.joda.time.DateTimeZone;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -47,7 +45,7 @@ import java.util.function.Supplier;
* }</pre>
*
*
* The only publicly "consumable" API is {@link #translateAggregation(AggregationBuilder, List, NamedWriteableRegistry)}.
* The only publicly "consumable" API is {@link #translateAggregation(AggregationBuilder, NamedWriteableRegistry)}.
*/
public class RollupRequestTranslator {
@ -116,26 +114,22 @@ public class RollupRequestTranslator {
* relevant method below.
*
* @param source The source aggregation to translate into rollup-enabled version
* @param filterConditions A list used to track any filter conditions that sub-aggs may
* require.
* @param registry Registry containing the various aggregations so that we can easily
* deserialize into a stream for cloning
* @return Returns the fully translated aggregation tree. Note that it returns a list instead
* of a single AggBuilder, since some aggregations (e.g. avg) may result in two
* translated aggs (sum + count)
*/
public static List<AggregationBuilder> translateAggregation(AggregationBuilder source,
List<QueryBuilder> filterConditions,
NamedWriteableRegistry registry) {
public static List<AggregationBuilder> translateAggregation(AggregationBuilder source, NamedWriteableRegistry registry) {
if (source.getWriteableName().equals(DateHistogramAggregationBuilder.NAME)) {
return translateDateHistogram((DateHistogramAggregationBuilder) source, filterConditions, registry);
return translateDateHistogram((DateHistogramAggregationBuilder) source, registry);
} else if (source.getWriteableName().equals(HistogramAggregationBuilder.NAME)) {
return translateHistogram((HistogramAggregationBuilder) source, filterConditions, registry);
return translateHistogram((HistogramAggregationBuilder) source, registry);
} else if (RollupField.SUPPORTED_METRICS.contains(source.getWriteableName())) {
return translateVSLeaf((ValuesSourceAggregationBuilder.LeafOnly)source, registry);
} else if (source.getWriteableName().equals(TermsAggregationBuilder.NAME)) {
return translateTerms((TermsAggregationBuilder)source, filterConditions, registry);
return translateTerms((TermsAggregationBuilder)source, registry);
} else {
throw new IllegalArgumentException("Unable to translate aggregation tree into Rollup. Aggregation ["
+ source.getName() + "] is of type [" + source.getClass().getSimpleName() + "] which is " +
@ -195,22 +189,13 @@ public class RollupRequestTranslator {
* <li>Field: `{timestamp field}.date_histogram._count`</li>
* </ul>
* </li>
* <li>Add a filter condition:</li>
* <li>
* <ul>
* <li>Query type: TermQuery</li>
* <li>Field: `{timestamp_field}.date_histogram.interval`</li>
* <li>Value: `{source interval}`</li>
* </ul>
* </li>
* </ul>
*
*/
private static List<AggregationBuilder> translateDateHistogram(DateHistogramAggregationBuilder source,
List<QueryBuilder> filterConditions,
NamedWriteableRegistry registry) {
return translateVSAggBuilder(source, filterConditions, registry, () -> {
return translateVSAggBuilder(source, registry, () -> {
DateHistogramAggregationBuilder rolledDateHisto
= new DateHistogramAggregationBuilder(source.getName());
@ -220,13 +205,9 @@ public class RollupRequestTranslator {
rolledDateHisto.interval(source.interval());
}
String timezone = source.timeZone() == null ? DateTimeZone.UTC.toString() : source.timeZone().toString();
filterConditions.add(new TermQueryBuilder(RollupField.formatFieldName(source,
DateHistogramGroupConfig.TIME_ZONE), timezone));
ZoneId timeZone = source.timeZone() == null ? DateHistogramGroupConfig.DEFAULT_ZONEID_TIMEZONE : source.timeZone();
rolledDateHisto.timeZone(timeZone);
if (source.timeZone() != null) {
rolledDateHisto.timeZone(source.timeZone());
}
rolledDateHisto.offset(source.offset());
if (source.extendedBounds() != null) {
rolledDateHisto.extendedBounds(source.extendedBounds());
@ -248,14 +229,13 @@ public class RollupRequestTranslator {
* Notably, it adds a Sum metric to calculate the doc_count in each bucket.
*
* Conventions are identical to a date_histogram (excepting date-specific details), so see
* {@link #translateDateHistogram(DateHistogramAggregationBuilder, List, NamedWriteableRegistry)} for
* {@link #translateDateHistogram(DateHistogramAggregationBuilder, NamedWriteableRegistry)} for
* a complete list of conventions, examples, etc
*/
private static List<AggregationBuilder> translateHistogram(HistogramAggregationBuilder source,
List<QueryBuilder> filterConditions,
NamedWriteableRegistry registry) {
return translateVSAggBuilder(source, filterConditions, registry, () -> {
return translateVSAggBuilder(source, registry, () -> {
HistogramAggregationBuilder rolledHisto
= new HistogramAggregationBuilder(source.getName());
@ -328,10 +308,9 @@ public class RollupRequestTranslator {
*
*/
private static List<AggregationBuilder> translateTerms(TermsAggregationBuilder source,
List<QueryBuilder> filterConditions,
NamedWriteableRegistry registry) {
return translateVSAggBuilder(source, filterConditions, registry, () -> {
return translateVSAggBuilder(source, registry, () -> {
TermsAggregationBuilder rolledTerms
= new TermsAggregationBuilder(source.getName(), source.valueType());
rolledTerms.field(RollupField.formatFieldName(source, RollupField.VALUE));
@ -359,8 +338,6 @@ public class RollupRequestTranslator {
* ValueSourceBuilder. This method is called by all the agg-specific methods (e.g. translateDateHistogram())
*
* @param source The source aggregation that we wish to translate
* @param filterConditions A list of existing filter conditions, in case we need to add some
* for this particular agg
* @param registry Named registry for serializing leaf metrics. Not actually used by this method,
* but is passed downwards for leaf usage
* @param factory A factory closure that generates a new shallow clone of the `source`. E.g. if `source` is
@ -371,15 +348,14 @@ public class RollupRequestTranslator {
* @return the translated multi-bucket ValueSourceAggBuilder
*/
private static <T extends ValuesSourceAggregationBuilder> List<AggregationBuilder>
translateVSAggBuilder(ValuesSourceAggregationBuilder source, List<QueryBuilder> filterConditions,
NamedWriteableRegistry registry, Supplier<T> factory) {
translateVSAggBuilder(ValuesSourceAggregationBuilder source, NamedWriteableRegistry registry, Supplier<T> factory) {
T rolled = factory.get();
// Translate all subaggs and add to the newly translated agg
// NOTE: using for loop instead of stream because compiler explodes with a bug :/
for (AggregationBuilder subAgg : source.getSubAggregations()) {
List<AggregationBuilder> translated = translateAggregation(subAgg, filterConditions, registry);
List<AggregationBuilder> translated = translateAggregation(subAgg, registry);
for (AggregationBuilder t : translated) {
rolled.subAggregation(t);
}

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.rollup.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
@ -32,6 +33,8 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.LicenseUtils;
@ -57,6 +60,8 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
private static final DeprecationLogger deprecationLogger
= new DeprecationLogger(LogManager.getLogger(TransportPutRollupJobAction.class));
@Inject
public TransportPutRollupJobAction(TransportService transportService, ThreadPool threadPool,
@ -90,6 +95,7 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
}
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
checkForDeprecatedTZ(request);
FieldCapabilitiesRequest fieldCapsRequest = new FieldCapabilitiesRequest()
.indices(request.getConfig().getIndexPattern())
@ -115,6 +121,15 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
});
}
static void checkForDeprecatedTZ(PutRollupJobAction.Request request) {
String timeZone = request.getConfig().getGroupConfig().getDateHistogram().getTimeZone();
String modernTZ = DateUtils.DEPRECATED_LONG_TIMEZONES.get(timeZone);
if (modernTZ != null) {
deprecationLogger.deprecated("Creating Rollup job [" + request.getConfig().getId() + "] with timezone ["
+ timeZone + "], but [" + timeZone + "] has been deprecated by the IANA. Use [" + modernTZ +"] instead.");
}
}
private static RollupJob createRollupJob(RollupJobConfig config, ThreadPool threadPool) {
// ensure we only filter for the allowed headers
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()

View File

@ -173,10 +173,12 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
for (AggregationBuilder agg : sourceAgg.getAggregatorFactories()) {
// TODO this filter agg is now redundant given we filter on job ID
// in the query and the translator doesn't add any clauses anymore
List<QueryBuilder> filterConditions = new ArrayList<>(5);
// Translate the agg tree, and collect any potential filtering clauses
List<AggregationBuilder> translatedAgg = RollupRequestTranslator.translateAggregation(agg, filterConditions, registry);
List<AggregationBuilder> translatedAgg = RollupRequestTranslator.translateAggregation(agg, registry);
BoolQueryBuilder boolQuery = new BoolQueryBuilder();
filterConditions.forEach(boolQuery::must);

View File

@ -41,7 +41,6 @@ import org.elasticsearch.xpack.core.rollup.job.RollupIndexerJobStats;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
import org.joda.time.DateTimeZone;
import java.time.ZoneId;
import java.util.ArrayList;
@ -311,13 +310,5 @@ public abstract class RollupIndexer extends AsyncTwoPhaseIndexer<Map<String, Obj
}
return Collections.unmodifiableList(builders);
}
private static DateTimeZone toDateTimeZone(final String timezone) {
try {
return DateTimeZone.forOffsetHours(Integer.parseInt(timezone));
} catch (NumberFormatException e) {
return DateTimeZone.forID(timezone);
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
import org.joda.time.DateTimeZone;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
@ -662,6 +663,51 @@ public class RollupJobIdentifierUtilTests extends ESTestCase {
}
}
public void testObsoleteTimezone() {
// Job has "obsolete" timezone
DateHistogramGroupConfig dateHisto = new DateHistogramGroupConfig("foo", new DateHistogramInterval("1h"), null, "Canada/Mountain");
GroupConfig group = new GroupConfig(dateHisto);
RollupJobConfig job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
RollupJobCaps cap = new RollupJobCaps(job);
Set<RollupJobCaps> caps = singletonSet(cap);
DateHistogramAggregationBuilder builder = new DateHistogramAggregationBuilder("foo").field("foo")
.dateHistogramInterval(job.getGroupConfig().getDateHistogram().getInterval())
.timeZone(ZoneId.of("Canada/Mountain"));
Set<RollupJobCaps> bestCaps = RollupJobIdentifierUtils.findBestJobs(builder, caps);
assertThat(bestCaps.size(), equalTo(1));
builder = new DateHistogramAggregationBuilder("foo").field("foo")
.dateHistogramInterval(job.getGroupConfig().getDateHistogram().getInterval())
.timeZone(ZoneId.of("America/Edmonton"));
bestCaps = RollupJobIdentifierUtils.findBestJobs(builder, caps);
assertThat(bestCaps.size(), equalTo(1));
// now the reverse, job has "new" timezone
dateHisto = new DateHistogramGroupConfig("foo", new DateHistogramInterval("1h"), null, "America/Edmonton");
group = new GroupConfig(dateHisto);
job = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, group, emptyList(), null);
cap = new RollupJobCaps(job);
caps = singletonSet(cap);
builder = new DateHistogramAggregationBuilder("foo").field("foo")
.dateHistogramInterval(job.getGroupConfig().getDateHistogram().getInterval())
.timeZone(ZoneId.of("Canada/Mountain"));
bestCaps = RollupJobIdentifierUtils.findBestJobs(builder, caps);
assertThat(bestCaps.size(), equalTo(1));
builder = new DateHistogramAggregationBuilder("foo").field("foo")
.dateHistogramInterval(job.getGroupConfig().getDateHistogram().getInterval())
.timeZone(ZoneId.of("America/Edmonton"));
bestCaps = RollupJobIdentifierUtils.findBestJobs(builder, caps);
assertThat(bestCaps.size(), equalTo(1));
}
private static long getMillis(RollupJobCaps cap) {
for (RollupJobCaps.RollupFieldCaps fieldCaps : cap.getFieldCaps().values()) {
for (Map<String, Object> agg : fieldCaps.getAggs()) {

View File

@ -9,8 +9,6 @@ package org.elasticsearch.xpack.rollup;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
@ -33,7 +31,6 @@ import org.junit.Before;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@ -65,9 +62,8 @@ public class RollupRequestTranslationTests extends ESTestCase {
.extendedBounds(new ExtendedBounds(0L, 1000L))
.subAggregation(new MaxAggregationBuilder("the_max").field("max_field"))
.subAggregation(new AvgAggregationBuilder("the_avg").field("avg_field"));
List<QueryBuilder> filterConditions = new ArrayList<>();
List<AggregationBuilder> translated = translateAggregation(histo, filterConditions, namedWriteableRegistry);
List<AggregationBuilder> translated = translateAggregation(histo, namedWriteableRegistry);
assertThat(translated.size(), equalTo(1));
assertThat(translated.get(0), Matchers.instanceOf(DateHistogramAggregationBuilder.class));
DateHistogramAggregationBuilder translatedHisto = (DateHistogramAggregationBuilder)translated.get(0);
@ -93,22 +89,6 @@ public class RollupRequestTranslationTests extends ESTestCase {
assertThat(subAggs.get("test_histo._count"), Matchers.instanceOf(SumAggregationBuilder.class));
assertThat(((SumAggregationBuilder)subAggs.get("test_histo._count")).field(),
equalTo("foo.date_histogram._count"));
assertThat(filterConditions.size(), equalTo(1));
for (QueryBuilder q : filterConditions) {
if (q instanceof TermQueryBuilder) {
switch (((TermQueryBuilder) q).fieldName()) {
case "foo.date_histogram.time_zone":
assertThat(((TermQueryBuilder) q).value(), equalTo("UTC"));
break;
default:
fail("Unexpected Term Query in filter conditions: [" + ((TermQueryBuilder) q).fieldName() + "]");
break;
}
} else {
fail("Unexpected query builder in filter conditions");
}
}
}
public void testFormattedDateHisto() {
@ -118,9 +98,8 @@ public class RollupRequestTranslationTests extends ESTestCase {
.extendedBounds(new ExtendedBounds(0L, 1000L))
.format("yyyy-MM-dd")
.subAggregation(new MaxAggregationBuilder("the_max").field("max_field"));
List<QueryBuilder> filterConditions = new ArrayList<>();
List<AggregationBuilder> translated = translateAggregation(histo, filterConditions, namedWriteableRegistry);
List<AggregationBuilder> translated = translateAggregation(histo, namedWriteableRegistry);
assertThat(translated.size(), equalTo(1));
assertThat(translated.get(0), Matchers.instanceOf(DateHistogramAggregationBuilder.class));
DateHistogramAggregationBuilder translatedHisto = (DateHistogramAggregationBuilder)translated.get(0);
@ -133,7 +112,6 @@ public class RollupRequestTranslationTests extends ESTestCase {
public void testSimpleMetric() {
int i = ESTestCase.randomIntBetween(0, 2);
List<AggregationBuilder> translated = new ArrayList<>();
List<QueryBuilder> filterConditions = new ArrayList<>();
Class clazz = null;
String fieldName = null;
@ -141,17 +119,17 @@ public class RollupRequestTranslationTests extends ESTestCase {
if (i == 0) {
translated = translateAggregation(new MaxAggregationBuilder("test_metric")
.field("foo"), filterConditions, namedWriteableRegistry);
.field("foo"), namedWriteableRegistry);
clazz = MaxAggregationBuilder.class;
fieldName = "foo.max.value";
} else if (i == 1) {
translated = translateAggregation(new MinAggregationBuilder("test_metric")
.field("foo"), filterConditions, namedWriteableRegistry);
.field("foo"), namedWriteableRegistry);
clazz = MinAggregationBuilder.class;
fieldName = "foo.min.value";
} else if (i == 2) {
translated = translateAggregation(new SumAggregationBuilder("test_metric")
.field("foo"), filterConditions, namedWriteableRegistry);
.field("foo"), namedWriteableRegistry);
clazz = SumAggregationBuilder.class;
fieldName = "foo.sum.value";
}
@ -160,14 +138,12 @@ public class RollupRequestTranslationTests extends ESTestCase {
assertThat(translated.get(0), Matchers.instanceOf(clazz));
assertThat((translated.get(0)).getName(), equalTo("test_metric"));
assertThat(((ValuesSourceAggregationBuilder)translated.get(0)).field(), equalTo(fieldName));
assertThat(filterConditions.size(), equalTo(0));
}
public void testUnsupportedMetric() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> translateAggregation(new StatsAggregationBuilder("test_metric")
.field("foo"), Collections.emptyList(), namedWriteableRegistry));
.field("foo"), namedWriteableRegistry));
assertThat(e.getMessage(), equalTo("Unable to translate aggregation tree into Rollup. Aggregation [test_metric] is of type " +
"[StatsAggregationBuilder] which is currently unsupported."));
}
@ -178,9 +154,8 @@ public class RollupRequestTranslationTests extends ESTestCase {
.field("foo")
.subAggregation(new MaxAggregationBuilder("the_max").field("max_field"))
.subAggregation(new AvgAggregationBuilder("the_avg").field("avg_field"));
List<QueryBuilder> filterConditions = new ArrayList<>();
List<AggregationBuilder> translated = translateAggregation(histo, filterConditions, namedWriteableRegistry);
List<AggregationBuilder> translated = translateAggregation(histo, namedWriteableRegistry);
assertThat(translated.size(), equalTo(1));
assertThat(translated.get(0), instanceOf(DateHistogramAggregationBuilder.class));
DateHistogramAggregationBuilder translatedHisto = (DateHistogramAggregationBuilder)translated.get(0);
@ -206,20 +181,6 @@ public class RollupRequestTranslationTests extends ESTestCase {
assertThat(subAggs.get("test_histo._count"), instanceOf(SumAggregationBuilder.class));
assertThat(((SumAggregationBuilder)subAggs.get("test_histo._count")).field(),
equalTo("foo.date_histogram._count"));
assertThat(filterConditions.size(), equalTo(1));
for (QueryBuilder q : filterConditions) {
if (q instanceof TermQueryBuilder) {
if (((TermQueryBuilder) q).fieldName().equals("foo.date_histogram.time_zone")) {
assertThat(((TermQueryBuilder) q).value(), equalTo("UTC"));
} else {
fail("Unexpected Term Query in filter conditions: [" + ((TermQueryBuilder) q).fieldName() + "]");
}
} else {
fail("Unexpected query builder in filter conditions");
}
}
}
public void testDateHistoLongIntervalWithMinMax() {
@ -228,9 +189,8 @@ public class RollupRequestTranslationTests extends ESTestCase {
.field("foo")
.subAggregation(new MaxAggregationBuilder("the_max").field("max_field"))
.subAggregation(new AvgAggregationBuilder("the_avg").field("avg_field"));
List<QueryBuilder> filterConditions = new ArrayList<>();
List<AggregationBuilder> translated = translateAggregation(histo, filterConditions, namedWriteableRegistry);
List<AggregationBuilder> translated = translateAggregation(histo, namedWriteableRegistry);
assertThat(translated.size(), equalTo(1));
assertThat(translated.get(0), instanceOf(DateHistogramAggregationBuilder.class));
DateHistogramAggregationBuilder translatedHisto = (DateHistogramAggregationBuilder)translated.get(0);
@ -256,20 +216,6 @@ public class RollupRequestTranslationTests extends ESTestCase {
assertThat(subAggs.get("test_histo._count"), instanceOf(SumAggregationBuilder.class));
assertThat(((SumAggregationBuilder)subAggs.get("test_histo._count")).field(),
equalTo("foo.date_histogram._count"));
assertThat(filterConditions.size(), equalTo(1));
for (QueryBuilder q : filterConditions) {
if (q instanceof TermQueryBuilder) {
if (((TermQueryBuilder) q).fieldName().equals("foo.date_histogram.time_zone")) {
assertThat(((TermQueryBuilder) q).value(), equalTo("UTC"));
} else {
fail("Unexpected Term Query in filter conditions: [" + ((TermQueryBuilder) q).fieldName() + "]");
}
} else {
fail("Unexpected query builder in filter conditions");
}
}
}
public void testDateHistoWithTimezone() {
@ -278,9 +224,8 @@ public class RollupRequestTranslationTests extends ESTestCase {
histo.interval(86400000)
.field("foo")
.timeZone(timeZone);
List<QueryBuilder> filterConditions = new ArrayList<>();
List<AggregationBuilder> translated = translateAggregation(histo, filterConditions, namedWriteableRegistry);
List<AggregationBuilder> translated = translateAggregation(histo, namedWriteableRegistry);
assertThat(translated.size(), equalTo(1));
assertThat(translated.get(0), instanceOf(DateHistogramAggregationBuilder.class));
DateHistogramAggregationBuilder translatedHisto = (DateHistogramAggregationBuilder)translated.get(0);
@ -288,25 +233,11 @@ public class RollupRequestTranslationTests extends ESTestCase {
assertThat(translatedHisto.interval(), equalTo(86400000L));
assertThat(translatedHisto.field(), equalTo("foo.date_histogram.timestamp"));
assertThat(translatedHisto.timeZone(), equalTo(timeZone));
assertThat(filterConditions.size(), equalTo(1));
for (QueryBuilder q : filterConditions) {
if (q instanceof TermQueryBuilder) {
if (((TermQueryBuilder) q).fieldName().equals("foo.date_histogram.time_zone")) {
assertThat(((TermQueryBuilder) q).value(), equalTo(timeZone.toString()));
} else {
fail("Unexpected Term Query in filter conditions: [" + ((TermQueryBuilder) q).fieldName() + "]");
}
} else {
fail("Unexpected query builder in filter conditions");
}
}
}
public void testAvgMetric() {
List<QueryBuilder> filterConditions = new ArrayList<>();
List<AggregationBuilder> translated = translateAggregation(new AvgAggregationBuilder("test_metric")
.field("foo"), filterConditions, namedWriteableRegistry);
.field("foo"), namedWriteableRegistry);
assertThat(translated.size(), equalTo(2));
Map<String, AggregationBuilder> metrics = translated.stream()
@ -319,8 +250,6 @@ public class RollupRequestTranslationTests extends ESTestCase {
assertThat(metrics.get("test_metric._count"), Matchers.instanceOf(SumAggregationBuilder.class));
assertThat(((SumAggregationBuilder)metrics.get("test_metric._count")).field(),
equalTo("foo.avg._count"));
assertThat(filterConditions.size(), equalTo(0));
}
public void testStringTerms() throws IOException {
@ -329,9 +258,8 @@ public class RollupRequestTranslationTests extends ESTestCase {
terms.field("foo")
.subAggregation(new MaxAggregationBuilder("the_max").field("max_field"))
.subAggregation(new AvgAggregationBuilder("the_avg").field("avg_field"));
List<QueryBuilder> filterConditions = new ArrayList<>();
List<AggregationBuilder> translated = translateAggregation(terms, filterConditions, namedWriteableRegistry);
List<AggregationBuilder> translated = translateAggregation(terms, namedWriteableRegistry);
assertThat(translated.size(), equalTo(1));
assertThat(translated.get(0), Matchers.instanceOf(TermsAggregationBuilder.class));
TermsAggregationBuilder translatedHisto = (TermsAggregationBuilder)translated.get(0);
@ -356,8 +284,6 @@ public class RollupRequestTranslationTests extends ESTestCase {
assertThat(subAggs.get("test_string_terms._count"), Matchers.instanceOf(SumAggregationBuilder.class));
assertThat(((SumAggregationBuilder)subAggs.get("test_string_terms._count")).field(),
equalTo("foo.terms._count"));
assertThat(filterConditions.size(), equalTo(0));
}
public void testBasicHisto() {
@ -368,9 +294,8 @@ public class RollupRequestTranslationTests extends ESTestCase {
.extendedBounds(0.0, 1000.0)
.subAggregation(new MaxAggregationBuilder("the_max").field("max_field"))
.subAggregation(new AvgAggregationBuilder("the_avg").field("avg_field"));
List<QueryBuilder> filterConditions = new ArrayList<>();
List<AggregationBuilder> translated = translateAggregation(histo, filterConditions, namedWriteableRegistry);
List<AggregationBuilder> translated = translateAggregation(histo, namedWriteableRegistry);
assertThat(translated.size(), equalTo(1));
assertThat(translated.get(0), Matchers.instanceOf(HistogramAggregationBuilder.class));
HistogramAggregationBuilder translatedHisto = (HistogramAggregationBuilder)translated.get(0);
@ -396,18 +321,6 @@ public class RollupRequestTranslationTests extends ESTestCase {
assertThat(((SumAggregationBuilder)subAggs.get("test_histo._count")).field(),
equalTo("foo.histogram._count"));
assertThat(filterConditions.size(), equalTo(0));
for (QueryBuilder q : filterConditions) {
if (q instanceof TermQueryBuilder) {
switch (((TermQueryBuilder) q).fieldName()) {
default:
fail("Unexpected Term Query in filter conditions: [" + ((TermQueryBuilder) q).fieldName() + "]");
break;
}
} else {
fail("Unexpected query builder in filter conditions");
}
}
}
public void testUnsupportedAgg() {
@ -415,10 +328,9 @@ public class RollupRequestTranslationTests extends ESTestCase {
geo.field("foo")
.subAggregation(new MaxAggregationBuilder("the_max").field("max_field"))
.subAggregation(new AvgAggregationBuilder("the_avg").field("avg_field"));
List<QueryBuilder> filterConditions = new ArrayList<>();
Exception e = expectThrows(RuntimeException.class,
() -> translateAggregation(geo, filterConditions, namedWriteableRegistry));
() -> translateAggregation(geo, namedWriteableRegistry));
assertThat(e.getMessage(), equalTo("Unable to translate aggregation tree into Rollup. Aggregation [test_geo] is of type " +
"[GeoDistanceAggregationBuilder] which is currently unsupported."));
}

View File

@ -23,9 +23,13 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.action.PutRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.elasticsearch.xpack.rollup.Rollup;
@ -424,4 +428,22 @@ public class PutJobStateMachineTests extends ESTestCase {
verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any());
verify(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), any());
}
public void testDeprecatedTimeZone() {
GroupConfig groupConfig = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("1h"), null, "Japan"));
RollupJobConfig config = new RollupJobConfig("foo", randomAlphaOfLength(5), "rollup", ConfigTestHelpers.randomCron(),
100, groupConfig, Collections.emptyList(), null);
PutRollupJobAction.Request request = new PutRollupJobAction.Request(config);
TransportPutRollupJobAction.checkForDeprecatedTZ(request);
assertWarnings("Creating Rollup job [foo] with timezone [Japan], but [Japan] has been deprecated by the IANA. " +
"Use [Asia/Tokyo] instead.");
}
public void testTimeZone() {
GroupConfig groupConfig = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("1h"), null, "EST"));
RollupJobConfig config = new RollupJobConfig("foo", randomAlphaOfLength(5), "rollup", ConfigTestHelpers.randomCron(),
100, groupConfig, Collections.emptyList(), null);
PutRollupJobAction.Request request = new PutRollupJobAction.Request(config);
TransportPutRollupJobAction.checkForDeprecatedTZ(request);
}
}

View File

@ -118,7 +118,7 @@ public class SearchActionTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("Unsupported Query in search request: [match_phrase]"));
}
public void testRange() {
public void testRangeTimezoneUTC() {
final GroupConfig groupConfig = new GroupConfig(new DateHistogramGroupConfig("foo", new DateHistogramInterval("1h")));
final RollupJobConfig config = new RollupJobConfig("foo", "index", "rollup", "*/5 * * * * ?", 10, groupConfig, emptyList(), null);
RollupJobCaps cap = new RollupJobCaps(config);
@ -127,6 +127,7 @@ public class SearchActionTests extends ESTestCase {
QueryBuilder rewritten = TransportRollupSearchAction.rewriteQuery(new RangeQueryBuilder("foo").gt(1).timeZone("UTC"), caps);
assertThat(rewritten, instanceOf(RangeQueryBuilder.class));
assertThat(((RangeQueryBuilder)rewritten).fieldName(), equalTo("foo.date_histogram.timestamp"));
assertThat(((RangeQueryBuilder)rewritten).timeZone(), equalTo("UTC"));
}
public void testRangeNullTimeZone() {
@ -138,6 +139,7 @@ public class SearchActionTests extends ESTestCase {
QueryBuilder rewritten = TransportRollupSearchAction.rewriteQuery(new RangeQueryBuilder("foo").gt(1), caps);
assertThat(rewritten, instanceOf(RangeQueryBuilder.class));
assertThat(((RangeQueryBuilder)rewritten).fieldName(), equalTo("foo.date_histogram.timestamp"));
assertNull(((RangeQueryBuilder)rewritten).timeZone());
}
public void testRangeDifferentTZ() {

View File

@ -90,6 +90,11 @@ public class ConfigTests extends ESTestCase {
assertThat(e.getMessage(), equalTo("Unknown time-zone ID: FOO"));
}
public void testObsoleteTimeZone() {
DateHistogramGroupConfig config = new DateHistogramGroupConfig("foo", DateHistogramInterval.HOUR, null, "Canada/Mountain");
assertThat(config.getTimeZone(), equalTo("Canada/Mountain"));
}
public void testEmptyHistoField() {
Exception e = expectThrows(IllegalArgumentException.class, () -> new HistogramGroupConfig(1L, (String[]) null));
assertThat(e.getMessage(), equalTo("Fields must have at least one value"));

View File

@ -47,6 +47,7 @@ import org.joda.time.DateTime;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -561,6 +562,89 @@ public class IndexerUtilsTests extends AggregatorTestCase {
}
}
public void testTimezone() throws IOException {
String indexName = randomAlphaOfLengthBetween(1, 10);
RollupIndexerJobStats stats = new RollupIndexerJobStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
String timestampField = "the_histo";
String valueField = "the_avg";
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
{
Document document = new Document();
long timestamp = 1443659400000L; // 2015-10-01T00:30:00Z
document.add(new SortedNumericDocValuesField(timestampField, timestamp));
document.add(new LongPoint(timestampField, timestamp));
document.add(new SortedNumericDocValuesField(valueField, randomIntBetween(1, 100)));
indexWriter.addDocument(document);
}
{
Document document = new Document();
long timestamp = 1443663000000L; // 2015-10-01T01:30:00Z
document.add(new SortedNumericDocValuesField(timestampField, timestamp));
document.add(new LongPoint(timestampField, timestamp));
document.add(new SortedNumericDocValuesField(valueField, randomIntBetween(1, 100)));
indexWriter.addDocument(document);
}
indexWriter.close();
IndexReader indexReader = DirectoryReader.open(directory);
IndexSearcher indexSearcher = newIndexSearcher(indexReader);
DateFieldMapper.Builder builder = new DateFieldMapper.Builder(timestampField);
DateFieldMapper.DateFieldType timestampFieldType = builder.fieldType();
timestampFieldType.setHasDocValues(true);
timestampFieldType.setName(timestampField);
MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
valueFieldType.setName(valueField);
valueFieldType.setHasDocValues(true);
valueFieldType.setName(valueField);
// Setup the composite agg
DateHistogramValuesSourceBuilder dateHisto
= new DateHistogramValuesSourceBuilder("the_histo." + DateHistogramAggregationBuilder.NAME)
.field(timestampField)
.dateHistogramInterval(new DateHistogramInterval("1d"))
.timeZone(ZoneId.of("-01:00", ZoneId.SHORT_IDS)); // adds a timezone so that we aren't on default UTC
CompositeAggregationBuilder compositeBuilder = new CompositeAggregationBuilder(RollupIndexer.AGGREGATION_NAME,
singletonList(dateHisto));
MetricConfig metricConfig = new MetricConfig(valueField, singletonList("max"));
List<AggregationBuilder> metricAgg = createAggregationBuilders(singletonList(metricConfig));
metricAgg.forEach(compositeBuilder::subAggregation);
Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, timestampFieldType, valueFieldType);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L);
indexReader.close();
directory.close();
final GroupConfig groupConfig = randomGroupConfig(random());
List<IndexRequest> docs = IndexerUtils.processBuckets(composite, indexName, stats, groupConfig, "foo", randomBoolean());
assertThat(docs.size(), equalTo(2));
Map<String, Object> map = docs.get(0).sourceAsMap();
assertNotNull(map.get(valueField + "." + MaxAggregationBuilder.NAME + "." + RollupField.VALUE));
assertThat(map.get("the_histo." + DateHistogramAggregationBuilder.NAME + "." + RollupField.COUNT_FIELD), equalTo(1));
assertThat(map.get("the_histo." + DateHistogramAggregationBuilder.NAME + "." + RollupField.TIMESTAMP),
equalTo(1443574800000L)); // 2015-09-30T00:00:00.000-01:00
map = docs.get(1).sourceAsMap();
assertNotNull(map.get(valueField + "." + MaxAggregationBuilder.NAME + "." + RollupField.VALUE));
assertThat(map.get("the_histo." + DateHistogramAggregationBuilder.NAME + "." + RollupField.COUNT_FIELD), equalTo(1));
assertThat(map.get("the_histo." + DateHistogramAggregationBuilder.NAME + "." + RollupField.TIMESTAMP),
equalTo(1443661200000L)); // 2015-10-01T00:00:00.000-01:00
}
interface Mock {
List<? extends CompositeAggregation.Bucket> getBuckets();
}

View File

@ -881,6 +881,308 @@ setup:
interval: "1h"
time_zone: "UTC"
---
"Obsolete Timezone":
- skip:
version: " - 7.0.99"
reason: "IANA TZ deprecations in 7.1"
features: "warnings"
- do:
indices.create:
index: tz
body:
mappings:
properties:
timestamp:
type: date
partition:
type: keyword
price:
type: integer
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
warnings:
- "Creating Rollup job [tz] with timezone [Canada/Mountain], but [Canada/Mountain] has been deprecated by the IANA. Use [America/Edmonton] instead."
rollup.put_job:
id: tz
body: >
{
"index_pattern": "tz",
"rollup_index": "tz_rollup",
"cron": "*/30 * * * * ?",
"page_size" :10,
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "5m",
"time_zone": "Canada/Mountain"
},
"terms": {
"fields": ["partition"]
}
},
"metrics": [
{
"field": "price",
"metrics": ["max"]
}
]
}
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
bulk:
refresh: true
body:
- index:
_index: "tz_rollup"
_type: "_doc"
- timestamp.date_histogram.timestamp: 1531221000000
timestamp.date_histogram.interval: "5m"
timestamp.date_histogram.time_zone: "America/Edmonton"
timestamp.date_histogram._count: 1
partition.terms.value: "a"
partition.terms._count: 1
price.max.value: 1
"_rollup.id": "tz"
"_rollup.version": 2
- index:
_index: "tz_rollup"
_type: "_doc"
- timestamp.date_histogram.timestamp: 1531221300000
timestamp.date_histogram.interval: "5m"
timestamp.date_histogram.time_zone: "America/Edmonton"
timestamp.date_histogram._count: 2
partition.terms.value: "b"
partition.terms._count: 2
price.max.value: 2
"_rollup.id": "tz"
"_rollup.version": 2
- index:
_index: "tz_rollup"
_type: "_doc"
- timestamp.date_histogram.timestamp: 1531221600000
timestamp.date_histogram.interval: "5m"
timestamp.date_histogram.time_zone: "America/Edmonton"
timestamp.date_histogram._count: 10
partition.terms.value: "a"
partition.terms._count: 10
price.max.value: 3
"_rollup.id": "tz"
"_rollup.version": 2
- do:
rollup.rollup_search:
index: "tz_rollup"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "5m"
time_zone: "America/Edmonton"
aggs:
the_max:
max:
field: "price"
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2018-07-10T05:10:00.000-06:00" }
- match: { aggregations.histo.buckets.0.doc_count: 1 }
- match: { aggregations.histo.buckets.0.the_max.value: 1 }
- match: { aggregations.histo.buckets.1.key_as_string: "2018-07-10T05:15:00.000-06:00" }
- match: { aggregations.histo.buckets.1.doc_count: 2 }
- match: { aggregations.histo.buckets.1.the_max.value: 2 }
- match: { aggregations.histo.buckets.2.key_as_string: "2018-07-10T05:20:00.000-06:00" }
- match: { aggregations.histo.buckets.2.doc_count: 10 }
- match: { aggregations.histo.buckets.2.the_max.value: 3 }
- do:
rollup.rollup_search:
index: "tz_rollup"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "5m"
time_zone: "Canada/Mountain"
aggs:
the_max:
max:
field: "price"
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2018-07-10T05:10:00.000-06:00" }
- match: { aggregations.histo.buckets.0.doc_count: 1 }
- match: { aggregations.histo.buckets.0.the_max.value: 1 }
- match: { aggregations.histo.buckets.1.key_as_string: "2018-07-10T05:15:00.000-06:00" }
- match: { aggregations.histo.buckets.1.doc_count: 2 }
- match: { aggregations.histo.buckets.1.the_max.value: 2 }
- match: { aggregations.histo.buckets.2.key_as_string: "2018-07-10T05:20:00.000-06:00" }
- match: { aggregations.histo.buckets.2.doc_count: 10 }
- match: { aggregations.histo.buckets.2.the_max.value: 3 }
---
"Obsolete BWC Timezone":
- skip:
version: " - 7.0.99"
reason: "IANA TZ deprecations in 7.1"
- do:
indices.create:
index: tz_rollup
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
partition.terms.value:
type: keyword
partition.terms._count:
type: long
timestamp.date_histogram.time_zone:
type: keyword
timestamp.date_histogram.interval:
type: keyword
timestamp.date_histogram.timestamp:
type: date
timestamp.date_histogram._count:
type: long
price.max.value:
type: double
_rollup.id:
type: keyword
_rollup.version:
type: long
_meta:
_rollup:
sensor:
cron: "* * * * * ?"
rollup_index: "tz_rollup"
index_pattern: "tz"
timeout: "20s"
page_size: 1000
groups:
date_histogram:
field: "timestamp"
interval: "5m"
time_zone: "Canada/Mountain"
terms:
fields:
- "partition"
id: tz
metrics:
- field: "price"
metrics:
- max
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
bulk:
refresh: true
body:
- index:
_index: "tz_rollup"
_type: "_doc"
- timestamp.date_histogram.timestamp: 1531221000000
timestamp.date_histogram.interval: "5m"
timestamp.date_histogram.time_zone: "Canada/Mountain"
timestamp.date_histogram._count: 1
partition.terms.value: "a"
partition.terms._count: 1
price.max.value: 1
"_rollup.id": "tz"
"_rollup.version": 2
- index:
_index: "tz_rollup"
_type: "_doc"
- timestamp.date_histogram.timestamp: 1531221300000
timestamp.date_histogram.interval: "5m"
timestamp.date_histogram.time_zone: "Canada/Mountain"
timestamp.date_histogram._count: 2
partition.terms.value: "b"
partition.terms._count: 2
price.max.value: 2
"_rollup.id": "tz"
"_rollup.version": 2
- index:
_index: "tz_rollup"
_type: "_doc"
- timestamp.date_histogram.timestamp: 1531221600000
timestamp.date_histogram.interval: "5m"
timestamp.date_histogram.time_zone: "Canada/Mountain"
timestamp.date_histogram._count: 10
partition.terms.value: "a"
partition.terms._count: 10
price.max.value: 3
"_rollup.id": "tz"
"_rollup.version": 2
- do:
rollup.rollup_search:
index: "tz_rollup"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "5m"
time_zone: "America/Edmonton"
aggs:
the_max:
max:
field: "price"
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2018-07-10T05:10:00.000-06:00" }
- match: { aggregations.histo.buckets.0.doc_count: 1 }
- match: { aggregations.histo.buckets.0.the_max.value: 1 }
- match: { aggregations.histo.buckets.1.key_as_string: "2018-07-10T05:15:00.000-06:00" }
- match: { aggregations.histo.buckets.1.doc_count: 2 }
- match: { aggregations.histo.buckets.1.the_max.value: 2 }
- match: { aggregations.histo.buckets.2.key_as_string: "2018-07-10T05:20:00.000-06:00" }
- match: { aggregations.histo.buckets.2.doc_count: 10 }
- match: { aggregations.histo.buckets.2.the_max.value: 3 }
- do:
rollup.rollup_search:
index: "tz_rollup"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "5m"
time_zone: "Canada/Mountain"
aggs:
the_max:
max:
field: "price"
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2018-07-10T05:10:00.000-06:00" }
- match: { aggregations.histo.buckets.0.doc_count: 1 }
- match: { aggregations.histo.buckets.0.the_max.value: 1 }
- match: { aggregations.histo.buckets.1.key_as_string: "2018-07-10T05:15:00.000-06:00" }
- match: { aggregations.histo.buckets.1.doc_count: 2 }
- match: { aggregations.histo.buckets.1.the_max.value: 2 }
- match: { aggregations.histo.buckets.2.key_as_string: "2018-07-10T05:20:00.000-06:00" }
- match: { aggregations.histo.buckets.2.doc_count: 10 }
- match: { aggregations.histo.buckets.2.the_max.value: 3 }
---
"Search with typed_keys":
@ -914,3 +1216,5 @@ setup:
- match: { aggregations.date_histogram#histo.buckets.3.key_as_string: "2017-01-01T08:00:00.000Z" }
- match: { aggregations.date_histogram#histo.buckets.3.doc_count: 20 }
- match: { aggregations.date_histogram#histo.buckets.3.max#the_max.value: 4 }