mirror of https://github.com/apache/druid.git
Handling latest_by and earliest_by on numeric columns correctly (#15939)
* Handling latest_by and earliest_by on numeric columns correctly * Adding test
This commit is contained in:
parent
313da98879
commit
85ee775390
|
@ -62,10 +62,6 @@ public abstract class NumericFirstAggregator implements Aggregator
|
||||||
@Override
|
@Override
|
||||||
public void aggregate()
|
public void aggregate()
|
||||||
{
|
{
|
||||||
if (timeSelector.isNull()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (needsFoldCheck) {
|
if (needsFoldCheck) {
|
||||||
final Object object = valueSelector.getObject();
|
final Object object = valueSelector.getObject();
|
||||||
if (object instanceof SerializablePair) {
|
if (object instanceof SerializablePair) {
|
||||||
|
@ -84,6 +80,10 @@ public abstract class NumericFirstAggregator implements Aggregator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (timeSelector.isNull()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
long time = timeSelector.getLong();
|
long time = timeSelector.getLong();
|
||||||
if (time < firstTime) {
|
if (time < firstTime) {
|
||||||
firstTime = time;
|
firstTime = time;
|
||||||
|
|
|
@ -97,10 +97,6 @@ public abstract class NumericFirstBufferAggregator implements BufferAggregator
|
||||||
@Override
|
@Override
|
||||||
public void aggregate(ByteBuffer buf, int position)
|
public void aggregate(ByteBuffer buf, int position)
|
||||||
{
|
{
|
||||||
if (timeSelector.isNull()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
long firstTime = buf.getLong(position);
|
long firstTime = buf.getLong(position);
|
||||||
if (needsFoldCheck) {
|
if (needsFoldCheck) {
|
||||||
final Object object = valueSelector.getObject();
|
final Object object = valueSelector.getObject();
|
||||||
|
@ -117,6 +113,10 @@ public abstract class NumericFirstBufferAggregator implements BufferAggregator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (timeSelector.isNull()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
long time = timeSelector.getLong();
|
long time = timeSelector.getLong();
|
||||||
|
|
||||||
if (time < firstTime) {
|
if (time < firstTime) {
|
||||||
|
|
|
@ -61,10 +61,6 @@ public abstract class NumericLastAggregator implements Aggregator
|
||||||
@Override
|
@Override
|
||||||
public void aggregate()
|
public void aggregate()
|
||||||
{
|
{
|
||||||
if (timeSelector.isNull()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (needsFoldCheck) {
|
if (needsFoldCheck) {
|
||||||
final Object object = valueSelector.getObject();
|
final Object object = valueSelector.getObject();
|
||||||
if (object instanceof SerializablePair) {
|
if (object instanceof SerializablePair) {
|
||||||
|
@ -83,6 +79,11 @@ public abstract class NumericLastAggregator implements Aggregator
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (timeSelector.isNull()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
long time = timeSelector.getLong();
|
long time = timeSelector.getLong();
|
||||||
if (time >= lastTime) {
|
if (time >= lastTime) {
|
||||||
lastTime = time;
|
lastTime = time;
|
||||||
|
|
|
@ -100,10 +100,6 @@ public abstract class NumericLastBufferAggregator implements BufferAggregator
|
||||||
@Override
|
@Override
|
||||||
public void aggregate(ByteBuffer buf, int position)
|
public void aggregate(ByteBuffer buf, int position)
|
||||||
{
|
{
|
||||||
if (timeSelector.isNull()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
long lastTime = buf.getLong(position);
|
long lastTime = buf.getLong(position);
|
||||||
if (needsFoldCheck) {
|
if (needsFoldCheck) {
|
||||||
final Object object = valueSelector.getObject();
|
final Object object = valueSelector.getObject();
|
||||||
|
@ -121,6 +117,10 @@ public abstract class NumericLastBufferAggregator implements BufferAggregator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (timeSelector.isNull()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
long time = timeSelector.getLong();
|
long time = timeSelector.getLong();
|
||||||
|
|
||||||
if (time >= lastTime) {
|
if (time >= lastTime) {
|
||||||
|
|
|
@ -15404,4 +15404,34 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
||||||
ImmutableList.of(new Object[]{NullHandling.sqlCompatible() ? 4L : 0L})
|
ImmutableList.of(new Object[]{NullHandling.sqlCompatible() ? 4L : 0L})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLatestByAggregatorOnSecondaryTimestampGroupBy()
|
||||||
|
{
|
||||||
|
msqIncompatible();
|
||||||
|
testQuery(
|
||||||
|
"SELECT __time, m1, LATEST_BY(m1, MILLIS_TO_TIMESTAMP(CAST(m2 AS NUMERIC))) from druid.numfoo GROUP BY 1,2",
|
||||||
|
ImmutableList.of(
|
||||||
|
new GroupByQuery.Builder()
|
||||||
|
.setDataSource(CalciteTests.DATASOURCE3)
|
||||||
|
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setDimensions(
|
||||||
|
new DefaultDimensionSpec("__time", "_d0", ColumnType.LONG),
|
||||||
|
new DefaultDimensionSpec("m1", "_d1", ColumnType.FLOAT)
|
||||||
|
)
|
||||||
|
.setAggregatorSpecs(aggregators(new FloatLastAggregatorFactory("a0", "m1", "m2")))
|
||||||
|
.setContext(OUTER_LIMIT_CONTEXT)
|
||||||
|
.build()
|
||||||
|
),
|
||||||
|
ImmutableList.of(
|
||||||
|
new Object[]{946684800000L, 1.0F, 1.0F},
|
||||||
|
new Object[]{946771200000L, 2.0F, 2.0F},
|
||||||
|
new Object[]{946857600000L, 3.0F, 3.0F},
|
||||||
|
new Object[]{978307200000L, 4.0F, 4.0F},
|
||||||
|
new Object[]{978393600000L, 5.0F, 5.0F},
|
||||||
|
new Object[]{978480000000L, 6.0F, 6.0F}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue