HistogramAggregator: Finer-grained rounding.
The way `HistogramAggregator` works is that for every value, it is going to compute a rounded value, that basically looks like `(value / interval) * interval` and use it as a key in a hash table to aggregate counts. However, the exact rounded value is not needed yet at that stage, all we need is a value that uniquely identifies the bucket, such as `(value / interval)`. We could only multiply with `interval` again when building the bucket: this way the second step is only performed once per bucket instead of once per value. Although this looks like a micro optimization for the case that was just decribed, it makes more sense with the date rounding implementations that we have that are more CPU-intensive. Close #4800
This commit is contained in:
parent
92a026b3b9
commit
1047267021
|
@ -28,17 +28,29 @@ import java.io.IOException;
|
||||||
/**
|
/**
|
||||||
* A strategy for rounding long values.
|
* A strategy for rounding long values.
|
||||||
*/
|
*/
|
||||||
public interface Rounding extends Streamable {
|
public abstract class Rounding implements Streamable {
|
||||||
|
|
||||||
byte id();
|
public abstract byte id();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rounds the given value.
|
* Given a value, compute a key that uniquely identifies the rounded value although it is not necessarily equal to the rounding value itself.
|
||||||
|
*/
|
||||||
|
public abstract long roundKey(long value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the rounded value given the key that identifies it.
|
||||||
|
*/
|
||||||
|
public abstract long valueForKey(long key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rounds the given value, equivalent to calling <code>roundValue(roundKey(value))</code>.
|
||||||
*
|
*
|
||||||
* @param value The value to round.
|
* @param value The value to round.
|
||||||
* @return The rounded value.
|
* @return The rounded value.
|
||||||
*/
|
*/
|
||||||
long round(long value);
|
public final long round(long value) {
|
||||||
|
return valueForKey(roundKey(value));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given the rounded value (which was potentially generated by {@link #round(long)}, returns the next rounding value. For example, with
|
* Given the rounded value (which was potentially generated by {@link #round(long)}, returns the next rounding value. For example, with
|
||||||
|
@ -47,14 +59,14 @@ public interface Rounding extends Streamable {
|
||||||
* @param value The current rounding value
|
* @param value The current rounding value
|
||||||
* @return The next rounding value;
|
* @return The next rounding value;
|
||||||
*/
|
*/
|
||||||
long nextRoundingValue(long value);
|
public abstract long nextRoundingValue(long value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Rounding strategy which is based on an interval
|
* Rounding strategy which is based on an interval
|
||||||
*
|
*
|
||||||
* {@code rounded = value - (value % interval) }
|
* {@code rounded = value - (value % interval) }
|
||||||
*/
|
*/
|
||||||
public static class Interval implements Rounding {
|
public static class Interval extends Rounding {
|
||||||
|
|
||||||
final static byte ID = 0;
|
final static byte ID = 0;
|
||||||
|
|
||||||
|
@ -77,21 +89,26 @@ public interface Rounding extends Streamable {
|
||||||
return ID;
|
return ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
static long round(long value, long interval) {
|
public static long roundKey(long value, long interval) {
|
||||||
long rem = value % interval;
|
if (value < 0) {
|
||||||
// We need this condition because % may return a negative result on negative numbers
|
return (value - interval + 1) / interval;
|
||||||
// According to Google caliper's IntModBenchmark, using a condition is faster than attempts to use tricks to avoid
|
} else {
|
||||||
// the condition. Moreover, in our case, the condition is very likely to be always true (dates, prices, distances),
|
return value / interval;
|
||||||
// so easily predictable by the CPU
|
|
||||||
if (rem < 0) {
|
|
||||||
rem += interval;
|
|
||||||
}
|
}
|
||||||
return value - rem;
|
}
|
||||||
|
|
||||||
|
public static long roundValue(long key, long interval) {
|
||||||
|
return key * interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long value) {
|
public long roundKey(long value) {
|
||||||
return round(value, interval);
|
return roundKey(value, interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long key) {
|
||||||
|
return key * interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -29,9 +29,7 @@ import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public abstract class TimeZoneRounding implements Rounding {
|
public abstract class TimeZoneRounding extends Rounding {
|
||||||
|
|
||||||
public abstract long round(long utcMillis);
|
|
||||||
|
|
||||||
public static Builder builder(DateTimeUnit unit) {
|
public static Builder builder(DateTimeUnit unit) {
|
||||||
return new Builder(unit);
|
return new Builder(unit);
|
||||||
|
@ -148,9 +146,13 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
long time = utcMillis + preTz.getOffset(utcMillis);
|
long time = utcMillis + preTz.getOffset(utcMillis);
|
||||||
time = unit.field().roundFloor(time);
|
return unit.field().roundFloor(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long time) {
|
||||||
// now, time is still in local, move it to UTC (or the adjustLargeInterval flag is set)
|
// now, time is still in local, move it to UTC (or the adjustLargeInterval flag is set)
|
||||||
time = time - preTz.getOffset(time);
|
time = time - preTz.getOffset(time);
|
||||||
// now apply post Tz
|
// now apply post Tz
|
||||||
|
@ -160,7 +162,6 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long nextRoundingValue(long value) {
|
public long nextRoundingValue(long value) {
|
||||||
// return value + unit.field().getDurationField().getUnitMillis();
|
|
||||||
return unit.field().roundCeiling(value + 1);
|
return unit.field().roundCeiling(value + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,10 +199,15 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
return unit.field().roundFloor(utcMillis);
|
return unit.field().roundFloor(utcMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long key) {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long nextRoundingValue(long value) {
|
public long nextRoundingValue(long value) {
|
||||||
return unit.field().roundCeiling(value + 1);
|
return unit.field().roundCeiling(value + 1);
|
||||||
|
@ -241,9 +247,13 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
long time = utcMillis + preTz.getOffset(utcMillis);
|
long time = utcMillis + preTz.getOffset(utcMillis);
|
||||||
time = unit.field().roundFloor(time);
|
return unit.field().roundFloor(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long time) {
|
||||||
// after rounding, since its day level (and above), its actually UTC!
|
// after rounding, since its day level (and above), its actually UTC!
|
||||||
// now apply post Tz
|
// now apply post Tz
|
||||||
time = time + postTz.getOffset(time);
|
time = time + postTz.getOffset(time);
|
||||||
|
@ -289,8 +299,13 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
return Rounding.Interval.round(utcMillis, interval);
|
return Rounding.Interval.roundKey(utcMillis, interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long key) {
|
||||||
|
return Rounding.Interval.roundValue(key, interval);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -333,9 +348,14 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
long time = utcMillis + preTz.getOffset(utcMillis);
|
long time = utcMillis + preTz.getOffset(utcMillis);
|
||||||
time = Rounding.Interval.round(time, interval);
|
return Rounding.Interval.roundKey(time, interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long key) {
|
||||||
|
long time = Rounding.Interval.roundValue(key, interval);
|
||||||
// now, time is still in local, move it to UTC
|
// now, time is still in local, move it to UTC
|
||||||
time = time - preTz.getOffset(time);
|
time = time - preTz.getOffset(time);
|
||||||
// now apply post Tz
|
// now apply post Tz
|
||||||
|
@ -386,9 +406,14 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
long time = utcMillis + preTz.getOffset(utcMillis);
|
long time = utcMillis + preTz.getOffset(utcMillis);
|
||||||
time = Rounding.Interval.round(time, interval);
|
return Rounding.Interval.roundKey(time, interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long key) {
|
||||||
|
long time = Rounding.Interval.roundValue(key, interval);
|
||||||
// after rounding, since its day level (and above), its actually UTC!
|
// after rounding, since its day level (and above), its actually UTC!
|
||||||
// now apply post Tz
|
// now apply post Tz
|
||||||
time = time + postTz.getOffset(time);
|
time = time + postTz.getOffset(time);
|
||||||
|
@ -437,8 +462,13 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
return timeZoneRounding.round((long) (factor * utcMillis));
|
return timeZoneRounding.roundKey((long) (factor * utcMillis));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long key) {
|
||||||
|
return timeZoneRounding.valueForKey(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -483,8 +513,13 @@ public abstract class TimeZoneRounding implements Rounding {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long round(long utcMillis) {
|
public long roundKey(long utcMillis) {
|
||||||
return postOffset + timeZoneRounding.round(utcMillis + preOffset);
|
return timeZoneRounding.roundKey(utcMillis + preOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long valueForKey(long key) {
|
||||||
|
return postOffset + timeZoneRounding.valueForKey(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -87,7 +87,7 @@ public class HistogramAggregator extends BucketsAggregator {
|
||||||
long previousKey = Long.MIN_VALUE;
|
long previousKey = Long.MIN_VALUE;
|
||||||
for (int i = 0; i < valuesCount; ++i) {
|
for (int i = 0; i < valuesCount; ++i) {
|
||||||
long value = values.nextValue();
|
long value = values.nextValue();
|
||||||
long key = rounding.round(value);
|
long key = rounding.roundKey(value);
|
||||||
assert key >= previousKey;
|
assert key >= previousKey;
|
||||||
if (key == previousKey) {
|
if (key == previousKey) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -110,7 +110,7 @@ public class HistogramAggregator extends BucketsAggregator {
|
||||||
if (ord < 0) {
|
if (ord < 0) {
|
||||||
continue; // slot is not allocated
|
continue; // slot is not allocated
|
||||||
}
|
}
|
||||||
buckets.add(histogramFactory.createBucket(bucketOrds.key(i), bucketDocCount(ord), bucketAggregations(ord)));
|
buckets.add(histogramFactory.createBucket(rounding.valueForKey(bucketOrds.key(i)), bucketDocCount(ord), bucketAggregations(ord)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue