Remove offset rounding
This is in favour of doing the offset calculations in the date histogram
This commit is contained in:
parent
c14155e4a8
commit
b6ef99195d
|
@ -72,8 +72,6 @@ public abstract class Rounding implements Streamable {
|
|||
|
||||
private DateTimeZone timeZone = DateTimeZone.UTC;
|
||||
|
||||
private long offset;
|
||||
|
||||
public Builder(DateTimeUnit unit) {
|
||||
this.unit = unit;
|
||||
this.interval = -1;
|
||||
|
@ -94,11 +92,6 @@ public abstract class Rounding implements Streamable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder offset(long offset) {
|
||||
this.offset = offset;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Rounding build() {
|
||||
Rounding timeZoneRounding;
|
||||
if (unit != null) {
|
||||
|
@ -106,9 +99,6 @@ public abstract class Rounding implements Streamable {
|
|||
} else {
|
||||
timeZoneRounding = new TimeIntervalRounding(interval, timeZone);
|
||||
}
|
||||
if (offset != 0) {
|
||||
timeZoneRounding = new OffsetRounding(timeZoneRounding, offset);
|
||||
}
|
||||
return timeZoneRounding;
|
||||
}
|
||||
}
|
||||
|
@ -330,68 +320,6 @@ public abstract class Rounding implements Streamable {
|
|||
}
|
||||
}
|
||||
|
||||
public static class OffsetRounding extends Rounding {
|
||||
|
||||
static final byte ID = 8;
|
||||
|
||||
private Rounding rounding;
|
||||
|
||||
private long offset;
|
||||
|
||||
OffsetRounding() { // for serialization
|
||||
}
|
||||
|
||||
public OffsetRounding(Rounding intervalRounding, long offset) {
|
||||
this.rounding = intervalRounding;
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte id() {
|
||||
return ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long round(long value) {
|
||||
return rounding.round(value - offset) + offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long nextRoundingValue(long value) {
|
||||
return rounding.nextRoundingValue(value - offset) + offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
rounding = Rounding.Streams.read(in);
|
||||
offset = in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
Rounding.Streams.write(rounding, out);
|
||||
out.writeLong(offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(rounding, offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
OffsetRounding other = (OffsetRounding) obj;
|
||||
return Objects.equals(rounding, other.rounding)
|
||||
&& Objects.equals(offset, other.offset);
|
||||
}
|
||||
}
|
||||
|
||||
public static class Streams {
|
||||
|
||||
public static void write(Rounding rounding, StreamOutput out) throws IOException {
|
||||
|
@ -405,7 +333,6 @@ public abstract class Rounding implements Streamable {
|
|||
switch (id) {
|
||||
case TimeUnitRounding.ID: rounding = new TimeUnitRounding(); break;
|
||||
case TimeIntervalRounding.ID: rounding = new TimeIntervalRounding(); break;
|
||||
case OffsetRounding.ID: rounding = new OffsetRounding(); break;
|
||||
default: throw new ElasticsearchException("unknown rounding id [" + id + "]");
|
||||
}
|
||||
rounding.readFrom(in);
|
||||
|
|
|
@ -60,14 +60,17 @@ class DateHistogramAggregator extends BucketsAggregator {
|
|||
private final ExtendedBounds extendedBounds;
|
||||
|
||||
private final LongHash bucketOrds;
|
||||
private long offset;
|
||||
|
||||
public DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, InternalOrder order, boolean keyed,
|
||||
public DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, long offset, InternalOrder order,
|
||||
boolean keyed,
|
||||
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource.Numeric valuesSource,
|
||||
DocValueFormat formatter, AggregationContext aggregationContext,
|
||||
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
||||
|
||||
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
|
||||
this.rounding = rounding;
|
||||
this.offset = offset;
|
||||
this.order = order;
|
||||
this.keyed = keyed;
|
||||
this.minDocCount = minDocCount;
|
||||
|
@ -100,7 +103,7 @@ class DateHistogramAggregator extends BucketsAggregator {
|
|||
long previousRounded = Long.MIN_VALUE;
|
||||
for (int i = 0; i < valuesCount; ++i) {
|
||||
long value = values.valueAt(i);
|
||||
long rounded = rounding.round(value);
|
||||
long rounded = rounding.round(value - offset) + offset;
|
||||
assert rounded >= previousRounded;
|
||||
if (rounded == previousRounded) {
|
||||
continue;
|
||||
|
@ -133,7 +136,7 @@ class DateHistogramAggregator extends BucketsAggregator {
|
|||
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
|
||||
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
|
||||
: null;
|
||||
return new InternalDateHistogram(name, buckets, order, minDocCount, emptyBucketInfo, formatter, keyed,
|
||||
return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
|
||||
pipelineAggregators(), metaData());
|
||||
}
|
||||
|
||||
|
@ -142,7 +145,7 @@ class DateHistogramAggregator extends BucketsAggregator {
|
|||
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
|
||||
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
|
||||
: null;
|
||||
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, emptyBucketInfo, formatter, keyed,
|
||||
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
|
||||
pipelineAggregators(), metaData());
|
||||
}
|
||||
|
||||
|
|
|
@ -111,7 +111,7 @@ public final class DateHistogramAggregatorFactory
|
|||
if (timeZone() != null) {
|
||||
tzRoundingBuilder.timeZone(timeZone());
|
||||
}
|
||||
Rounding rounding = tzRoundingBuilder.offset(offset).build();
|
||||
Rounding rounding = tzRoundingBuilder.build();
|
||||
return rounding;
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ public final class DateHistogramAggregatorFactory
|
|||
// parse any string bounds to longs and round them
|
||||
roundedBounds = extendedBounds.parseAndValidate(name, context.searchContext(), config.format()).round(rounding);
|
||||
}
|
||||
return new DateHistogramAggregator(name, factories, rounding, order, keyed, minDocCount, roundedBounds, valuesSource,
|
||||
return new DateHistogramAggregator(name, factories, rounding, offset, order, keyed, minDocCount, roundedBounds, valuesSource,
|
||||
config.format(), context, parent, pipelineAggregators, metaData);
|
||||
}
|
||||
|
||||
|
|
|
@ -178,14 +178,17 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
private final DocValueFormat format;
|
||||
private final boolean keyed;
|
||||
private final long minDocCount;
|
||||
private final long offset;
|
||||
private final EmptyBucketInfo emptyBucketInfo;
|
||||
|
||||
InternalDateHistogram(String name, List<Bucket> buckets, InternalOrder order, long minDocCount, EmptyBucketInfo emptyBucketInfo,
|
||||
InternalDateHistogram(String name, List<Bucket> buckets, InternalOrder order, long minDocCount, long offset,
|
||||
EmptyBucketInfo emptyBucketInfo,
|
||||
DocValueFormat formatter, boolean keyed, List<PipelineAggregator> pipelineAggregators,
|
||||
Map<String, Object> metaData) {
|
||||
super(name, pipelineAggregators, metaData);
|
||||
this.buckets = buckets;
|
||||
this.order = order;
|
||||
this.offset = offset;
|
||||
assert (minDocCount == 0) == (emptyBucketInfo != null);
|
||||
this.minDocCount = minDocCount;
|
||||
this.emptyBucketInfo = emptyBucketInfo;
|
||||
|
@ -205,6 +208,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
} else {
|
||||
emptyBucketInfo = null;
|
||||
}
|
||||
offset = in.readLong();
|
||||
format = in.readNamedWriteable(DocValueFormat.class);
|
||||
keyed = in.readBoolean();
|
||||
buckets = in.readList(stream -> new Bucket(stream, keyed, format));
|
||||
|
@ -217,6 +221,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
if (minDocCount == 0) {
|
||||
emptyBucketInfo.writeTo(out);
|
||||
}
|
||||
out.writeLong(offset);
|
||||
out.writeNamedWriteable(format);
|
||||
out.writeBoolean(keyed);
|
||||
out.writeList(buckets);
|
||||
|
@ -234,7 +239,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
|
||||
@Override
|
||||
public InternalDateHistogram create(List<Bucket> buckets) {
|
||||
return new InternalDateHistogram(name, buckets, order, minDocCount, emptyBucketInfo, format,
|
||||
return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, format,
|
||||
keyed, pipelineAggregators(), metaData);
|
||||
}
|
||||
|
||||
|
@ -328,7 +333,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
long max = bounds.getMax();
|
||||
while (key <= max) {
|
||||
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
key = nextKey(key).longValue();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -337,7 +342,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
if (key < firstBucket.key) {
|
||||
while (key < firstBucket.key) {
|
||||
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
key = nextKey(key).longValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -349,10 +354,10 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
while (iter.hasNext()) {
|
||||
Bucket nextBucket = list.get(iter.nextIndex());
|
||||
if (lastBucket != null) {
|
||||
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
|
||||
long key = nextKey(lastBucket.key).longValue();
|
||||
while (key < nextBucket.key) {
|
||||
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
|
||||
key = emptyBucketInfo.rounding.nextRoundingValue(key);
|
||||
key = nextKey(key).longValue();
|
||||
}
|
||||
assert key == nextBucket.key;
|
||||
}
|
||||
|
@ -393,7 +398,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
CollectionUtil.introSort(reducedBuckets, order.comparator());
|
||||
}
|
||||
|
||||
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo,
|
||||
return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
|
||||
format, keyed, pipelineAggregators(), getMetaData());
|
||||
}
|
||||
|
||||
|
@ -424,7 +429,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
|
||||
@Override
|
||||
public Number nextKey(Number key) {
|
||||
return emptyBucketInfo.rounding.nextRoundingValue(key.longValue());
|
||||
return emptyBucketInfo.rounding.nextRoundingValue(key.longValue() - offset) + offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -435,7 +440,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
|
|||
buckets2.add((Bucket) b);
|
||||
}
|
||||
buckets2 = Collections.unmodifiableList(buckets2);
|
||||
return new InternalDateHistogram(name, buckets2, order, minDocCount, emptyBucketInfo, format,
|
||||
return new InternalDateHistogram(name, buckets2, order, minDocCount, offset, emptyBucketInfo, format,
|
||||
keyed, pipelineAggregators(), getMetaData());
|
||||
}
|
||||
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.rounding;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class OffsetRoundingTests extends ESTestCase {
|
||||
|
||||
/**
|
||||
* Simple test case to illustrate how Rounding.Offset works on readable input.
|
||||
* offset shifts input value back before rounding (so here 6 - 7 -> -1)
|
||||
* then shifts rounded Value back (here -10 -> -3)
|
||||
*/
|
||||
public void testOffsetRounding() {
|
||||
final long interval = 10;
|
||||
final long offset = 7;
|
||||
Rounding.OffsetRounding rounding = new Rounding.OffsetRounding(
|
||||
new Rounding.TimeIntervalRounding(interval, DateTimeZone.UTC), offset);
|
||||
assertEquals(-3, rounding.round(6));
|
||||
assertEquals(7, rounding.nextRoundingValue(-3));
|
||||
assertEquals(7, rounding.round(7));
|
||||
assertEquals(17, rounding.nextRoundingValue(7));
|
||||
assertEquals(7, rounding.round(16));
|
||||
assertEquals(17, rounding.round(17));
|
||||
assertEquals(27, rounding.nextRoundingValue(17));
|
||||
}
|
||||
|
||||
/**
|
||||
* test OffsetRounding with an internal interval rounding on random inputs
|
||||
*/
|
||||
public void testOffsetRoundingRandom() {
|
||||
for (int i = 0; i < 1000; ++i) {
|
||||
final long interval = randomIntBetween(1, 100);
|
||||
Rounding internalRounding = new Rounding.TimeIntervalRounding(interval, DateTimeZone.UTC);
|
||||
final long offset = randomIntBetween(-100, 100);
|
||||
Rounding.OffsetRounding rounding = new Rounding.OffsetRounding(internalRounding, offset);
|
||||
long safetyMargin = Math.abs(interval) + Math.abs(offset); // to prevent range overflow
|
||||
long value = Math.max(randomLong() - safetyMargin, Long.MIN_VALUE + safetyMargin);
|
||||
final long r_value = rounding.round(value);
|
||||
final long nextRoundingValue = rounding.nextRoundingValue(r_value);
|
||||
assertThat("Rounding should be idempotent", r_value, equalTo(rounding.round(r_value)));
|
||||
assertThat("Rounded value smaller than unrounded, regardless of offset", r_value - offset, lessThanOrEqualTo(value - offset));
|
||||
assertThat("Rounded value <= value < next interval start", r_value + interval, greaterThan(value));
|
||||
assertThat("NextRounding value should be interval from rounded value", r_value + interval, equalTo(nextRoundingValue));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -55,10 +55,6 @@ public class TimeZoneRoundingTests extends ESTestCase {
|
|||
tzRounding = Rounding.builder(DateTimeUnit.WEEK_OF_WEEKYEAR).build();
|
||||
assertThat(tzRounding.round(time("2012-01-10T01:01:01")), isDate(time("2012-01-09T00:00:00.000Z"), tz));
|
||||
assertThat(tzRounding.nextRoundingValue(time("2012-01-09T00:00:00.000Z")), isDate(time("2012-01-16T00:00:00.000Z"), tz));
|
||||
|
||||
tzRounding = Rounding.builder(DateTimeUnit.WEEK_OF_WEEKYEAR).offset(-TimeValue.timeValueHours(24).millis()).build();
|
||||
assertThat(tzRounding.round(time("2012-01-10T01:01:01")), isDate(time("2012-01-08T00:00:00.000Z"), tz));
|
||||
assertThat(tzRounding.nextRoundingValue(time("2012-01-08T00:00:00.000Z")), isDate(time("2012-01-15T00:00:00.000Z"), tz));
|
||||
}
|
||||
|
||||
public void testUTCIntervalRounding() {
|
||||
|
|
Loading…
Reference in New Issue