mirror of https://github.com/apache/druid.git
Addressing review comments by adding the following:
1. Checking capabilities first before creating selectors 2. Removing mockito in tests for numeric first aggs 3. Removing unnecessary tests
This commit is contained in:
parent
5f65c42a51
commit
f78ca05fd6
|
@ -41,7 +41,6 @@ import org.apache.druid.segment.NilColumnValueSelector;
|
|||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
|
@ -144,15 +143,13 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
|
|||
)
|
||||
{
|
||||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
|
||||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
|
||||
//time is always long
|
||||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
|
||||
timeColumn);
|
||||
if (capabilities == null || capabilities.isNumeric()) {
|
||||
if (capabilities.isNumeric()) {
|
||||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
|
||||
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(
|
||||
timeColumn);
|
||||
return new DoubleFirstVectorAggregator(timeSelector, valueSelector);
|
||||
} else {
|
||||
return NumericNilVectorAggregator.doubleNilVectorAggregator();
|
||||
}
|
||||
return NumericNilVectorAggregator.doubleNilVectorAggregator();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,12 +27,10 @@ import java.nio.ByteBuffer;
|
|||
|
||||
public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator
|
||||
{
|
||||
double firstValue;
|
||||
|
||||
public DoubleFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
|
||||
{
|
||||
super(timeSelector, valueSelector);
|
||||
firstValue = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -45,13 +43,13 @@ public class DoubleFirstVectorAggregator extends NumericFirstVectorAggregator
|
|||
@Override
|
||||
void putValue(ByteBuffer buf, int position, int index)
|
||||
{
|
||||
firstValue = valueSelector.getDoubleVector()[index];
|
||||
double firstValue = valueSelector.getDoubleVector()[index];
|
||||
buf.putDouble(position, firstValue);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return The primitive object stored at the position in the buffer.
|
||||
* @return The object as a pair with the position and the value stored at the position in the buffer.
|
||||
*/
|
||||
@Nullable
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.druid.segment.NilColumnValueSelector;
|
|||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
|
@ -134,15 +133,12 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
|
|||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
|
||||
{
|
||||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
|
||||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
|
||||
//time is always long
|
||||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
|
||||
timeColumn);
|
||||
if (capabilities == null || capabilities.isNumeric()) {
|
||||
if (capabilities.isNumeric()) {
|
||||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
|
||||
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(timeColumn);
|
||||
return new FloatFirstVectorAggregator(timeSelector, valueSelector);
|
||||
} else {
|
||||
return NumericNilVectorAggregator.floatNilVectorAggregator();
|
||||
}
|
||||
return NumericNilVectorAggregator.floatNilVectorAggregator();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,12 +27,10 @@ import java.nio.ByteBuffer;
|
|||
|
||||
public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator
|
||||
{
|
||||
float firstValue;
|
||||
|
||||
public FloatFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
|
||||
{
|
||||
super(timeSelector, valueSelector);
|
||||
firstValue = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -45,13 +43,13 @@ public class FloatFirstVectorAggregator extends NumericFirstVectorAggregator
|
|||
@Override
|
||||
void putValue(ByteBuffer buf, int position, int index)
|
||||
{
|
||||
firstValue = valueSelector.getFloatVector()[index];
|
||||
float firstValue = valueSelector.getFloatVector()[index];
|
||||
buf.putFloat(position, firstValue);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return The primitive object stored at the position in the buffer.
|
||||
* @return The object as a pair with the position and the value stored at the position in the buffer.
|
||||
*/
|
||||
@Nullable
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.druid.segment.NilColumnValueSelector;
|
|||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
|
||||
|
@ -133,14 +132,13 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
|
|||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory columnSelectorFactory)
|
||||
{
|
||||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);
|
||||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
|
||||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) columnSelectorFactory.makeValueSelector(
|
||||
timeColumn);
|
||||
if (capabilities == null || capabilities.isNumeric()) {
|
||||
if (capabilities.isNumeric()) {
|
||||
VectorValueSelector valueSelector = columnSelectorFactory.makeValueSelector(fieldName);
|
||||
VectorValueSelector timeSelector = columnSelectorFactory.makeValueSelector(
|
||||
timeColumn);
|
||||
return new LongFirstVectorAggregator(timeSelector, valueSelector);
|
||||
} else {
|
||||
return NumericNilVectorAggregator.longNilVectorAggregator();
|
||||
}
|
||||
return NumericNilVectorAggregator.longNilVectorAggregator();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,12 +27,9 @@ import java.nio.ByteBuffer;
|
|||
|
||||
public class LongFirstVectorAggregator extends NumericFirstVectorAggregator
|
||||
{
|
||||
long firstValue;
|
||||
|
||||
public LongFirstVectorAggregator(VectorValueSelector timeSelector, VectorValueSelector valueSelector)
|
||||
{
|
||||
super(timeSelector, valueSelector);
|
||||
firstValue = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -45,13 +42,13 @@ public class LongFirstVectorAggregator extends NumericFirstVectorAggregator
|
|||
@Override
|
||||
void putValue(ByteBuffer buf, int position, int index)
|
||||
{
|
||||
firstValue = valueSelector.getLongVector()[index];
|
||||
long firstValue = valueSelector.getLongVector()[index];
|
||||
buf.putLong(position, firstValue);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return The primitive object stored at the position in the buffer.
|
||||
* @return The object as a pair with the position and the value stored at the position in the buffer.
|
||||
*/
|
||||
@Nullable
|
||||
@Override
|
||||
|
|
|
@ -145,7 +145,7 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator
|
|||
}
|
||||
|
||||
/**
|
||||
*Updates the time only to the appropriate position in buffer as the value is null
|
||||
* Updates the time only to the appropriate position in buffer as the value is null
|
||||
*
|
||||
* @param buf byte buffer storing the byte array representation of the aggregate
|
||||
* @param position offset within the byte buffer at which the current aggregate value is stored
|
||||
|
@ -163,7 +163,7 @@ public abstract class NumericFirstVectorAggregator implements VectorAggregator
|
|||
abstract void initValue(ByteBuffer buf, int position);
|
||||
|
||||
/**
|
||||
*Abstract function which needs to be overridden by subclasses to set the
|
||||
* Abstract function which needs to be overridden by subclasses to set the
|
||||
* latest value in the buffer depending on the datatype
|
||||
*/
|
||||
abstract void putValue(ByteBuffer buf, int position, int index);
|
||||
|
|
|
@ -22,23 +22,28 @@ package org.apache.druid.query.aggregation.first;
|
|||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.vector.BaseDoubleVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.NoFilterVectorOffset;
|
||||
import org.apache.druid.segment.vector.ReadableVectorInspector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final double EPSILON = 1e-5;
|
||||
|
@ -50,16 +55,15 @@ public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTes
|
|||
private static final String FIELD_NAME = "FIELD_NAME";
|
||||
private static final String TIME_COL = "__time";
|
||||
|
||||
@Mock
|
||||
private VectorValueSelector selector;
|
||||
@Mock
|
||||
|
||||
private BaseLongVectorValueSelector timeSelector;
|
||||
private ByteBuffer buf;
|
||||
|
||||
private DoubleFirstVectorAggregator target;
|
||||
|
||||
private DoubleFirstAggregatorFactory doubleFirstAggregatorFactory;
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
|
||||
private VectorColumnSelectorFactory selectorFactory;
|
||||
|
||||
@Before
|
||||
|
@ -68,14 +72,96 @@ public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTes
|
|||
byte[] randomBytes = new byte[1024];
|
||||
ThreadLocalRandom.current().nextBytes(randomBytes);
|
||||
buf = ByteBuffer.wrap(randomBytes);
|
||||
Mockito.doReturn(VALUES).when(selector).getDoubleVector();
|
||||
Mockito.doReturn(times).when(timeSelector).getLongVector();
|
||||
timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length)
|
||||
{
|
||||
})
|
||||
{
|
||||
@Override
|
||||
public long[] getLongVector()
|
||||
{
|
||||
return times;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
return NULLS;
|
||||
}
|
||||
};
|
||||
selector = new BaseDoubleVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length)
|
||||
{
|
||||
|
||||
})
|
||||
{
|
||||
@Override
|
||||
public double[] getDoubleVector()
|
||||
{
|
||||
return VALUES;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
if (!NullHandling.replaceWithDefault()) {
|
||||
return NULLS;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
target = new DoubleFirstVectorAggregator(timeSelector, selector);
|
||||
clearBufferForPositions(0, 0);
|
||||
selectorFactory = new VectorColumnSelectorFactory()
|
||||
{
|
||||
@Override
|
||||
public ReadableVectorInspector getReadableVectorInspector()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorValueSelector makeValueSelector(String column)
|
||||
{
|
||||
if (TIME_COL.equals(column)) {
|
||||
return timeSelector;
|
||||
} else if (FIELD_NAME.equals(column)) {
|
||||
return selector;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorObjectSelector makeObjectSelector(String column)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String column)
|
||||
{
|
||||
if (FIELD_NAME.equals(column)) {
|
||||
return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.DOUBLE);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME);
|
||||
Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL);
|
||||
doubleFirstAggregatorFactory = new DoubleFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL);
|
||||
}
|
||||
|
||||
|
@ -108,7 +194,6 @@ public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTes
|
|||
@Test
|
||||
public void aggregateWithNulls()
|
||||
{
|
||||
mockNullsVector();
|
||||
target.aggregate(buf, 0, 0, VALUES.length);
|
||||
Pair<Long, Double> result = (Pair<Long, Double>) target.get(buf, 0);
|
||||
Assert.assertEquals(times[0], result.lhs.longValue());
|
||||
|
@ -125,7 +210,11 @@ public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTes
|
|||
for (int i = 0; i < positions.length; i++) {
|
||||
Pair<Long, Double> result = (Pair<Long, Double>) target.get(buf, positions[i] + positionOffset);
|
||||
Assert.assertEquals(times[i], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
|
||||
if (!NullHandling.replaceWithDefault() && NULLS[i]) {
|
||||
Assert.assertNull(result.rhs);
|
||||
} else {
|
||||
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,7 +229,11 @@ public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTes
|
|||
for (int i = 0; i < positions.length; i++) {
|
||||
Pair<Long, Double> result = (Pair<Long, Double>) target.get(buf, positions[i] + positionOffset);
|
||||
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
|
||||
if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) {
|
||||
Assert.assertNull(result.rhs);
|
||||
} else {
|
||||
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,11 +243,4 @@ public class DoubleFirstVectorAggregationTest extends InitializedNullHandlingTes
|
|||
target.init(buf, offset + position);
|
||||
}
|
||||
}
|
||||
|
||||
private void mockNullsVector()
|
||||
{
|
||||
if (!NullHandling.replaceWithDefault()) {
|
||||
Mockito.doReturn(NULLS).when(selector).getNullVector();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,44 +22,49 @@ package org.apache.druid.query.aggregation.first;
|
|||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.vector.BaseFloatVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.NoFilterVectorOffset;
|
||||
import org.apache.druid.segment.vector.ReadableVectorInspector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final double EPSILON = 1e-5;
|
||||
private static final float[] VALUES = new float[]{7.2f, 15.6f, 2.1f, 150.0f};
|
||||
private static final boolean[] NULLS = new boolean[]{true, false, true, false};
|
||||
private static final boolean[] NULLS = new boolean[]{false, false, true, false};
|
||||
private long[] times = {2436, 6879, 7888, 8224};
|
||||
|
||||
private static final String NAME = "NAME";
|
||||
private static final String FIELD_NAME = "FIELD_NAME";
|
||||
private static final String TIME_COL = "__time";
|
||||
|
||||
@Mock
|
||||
|
||||
private VectorValueSelector selector;
|
||||
@Mock
|
||||
|
||||
private BaseLongVectorValueSelector timeSelector;
|
||||
private ByteBuffer buf;
|
||||
|
||||
private FloatFirstVectorAggregator target;
|
||||
|
||||
private FloatFirstAggregatorFactory floatFirstAggregatorFactory;
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
|
||||
private VectorColumnSelectorFactory selectorFactory;
|
||||
|
||||
@Before
|
||||
|
@ -68,14 +73,97 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
byte[] randomBytes = new byte[1024];
|
||||
ThreadLocalRandom.current().nextBytes(randomBytes);
|
||||
buf = ByteBuffer.wrap(randomBytes);
|
||||
Mockito.doReturn(VALUES).when(selector).getFloatVector();
|
||||
Mockito.doReturn(times).when(timeSelector).getLongVector();
|
||||
timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length)
|
||||
{
|
||||
})
|
||||
{
|
||||
@Override
|
||||
public long[] getLongVector()
|
||||
{
|
||||
return times;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
return NULLS;
|
||||
}
|
||||
};
|
||||
selector = new BaseFloatVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length)
|
||||
{
|
||||
|
||||
})
|
||||
{
|
||||
|
||||
@Override
|
||||
public float[] getFloatVector()
|
||||
{
|
||||
return VALUES;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
if (!NullHandling.replaceWithDefault()) {
|
||||
return NULLS;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
target = new FloatFirstVectorAggregator(timeSelector, selector);
|
||||
clearBufferForPositions(0, 0);
|
||||
|
||||
Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME);
|
||||
Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL);
|
||||
selectorFactory = new VectorColumnSelectorFactory()
|
||||
{
|
||||
@Override
|
||||
public ReadableVectorInspector getReadableVectorInspector()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorValueSelector makeValueSelector(String column)
|
||||
{
|
||||
if (TIME_COL.equals(column)) {
|
||||
return timeSelector;
|
||||
} else if (FIELD_NAME.equals(column)) {
|
||||
return selector;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorObjectSelector makeObjectSelector(String column)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String column)
|
||||
{
|
||||
if (FIELD_NAME.equals(column)) {
|
||||
return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.FLOAT);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
floatFirstAggregatorFactory = new FloatFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL);
|
||||
|
||||
}
|
||||
|
@ -110,16 +198,10 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void aggregateWithNulls()
|
||||
{
|
||||
mockNullsVector();
|
||||
target.aggregate(buf, 0, 0, VALUES.length);
|
||||
Pair<Long, Float> result = (Pair<Long, Float>) target.get(buf, 0);
|
||||
if (!NullHandling.replaceWithDefault()) {
|
||||
Assert.assertEquals(times[1], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[1], result.rhs, EPSILON);
|
||||
} else {
|
||||
Assert.assertEquals(times[0], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[0], result.rhs, EPSILON);
|
||||
}
|
||||
Assert.assertEquals(times[0], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[0], result.rhs, EPSILON);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -132,7 +214,11 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
for (int i = 0; i < positions.length; i++) {
|
||||
Pair<Long, Float> result = (Pair<Long, Float>) target.get(buf, positions[i] + positionOffset);
|
||||
Assert.assertEquals(times[i], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
|
||||
if (!NullHandling.replaceWithDefault() && NULLS[i]) {
|
||||
Assert.assertNull(result.rhs);
|
||||
} else {
|
||||
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,7 +233,11 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
for (int i = 0; i < positions.length; i++) {
|
||||
Pair<Long, Float> result = (Pair<Long, Float>) target.get(buf, positions[i] + positionOffset);
|
||||
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
|
||||
if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) {
|
||||
Assert.assertNull(result.rhs);
|
||||
} else {
|
||||
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -157,11 +247,4 @@ public class FloatFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
target.init(buf, offset + position);
|
||||
}
|
||||
}
|
||||
|
||||
private void mockNullsVector()
|
||||
{
|
||||
if (!NullHandling.replaceWithDefault()) {
|
||||
Mockito.doReturn(NULLS).when(selector).getNullVector();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,24 +22,28 @@ package org.apache.druid.query.aggregation.first;
|
|||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.dimension.DimensionSpec;
|
||||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.NoFilterVectorOffset;
|
||||
import org.apache.druid.segment.vector.ReadableVectorInspector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
import org.apache.druid.segment.vector.VectorValueSelector;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static final double EPSILON = 1e-5;
|
||||
|
@ -49,15 +53,12 @@ public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
private static final String FIELD_NAME = "FIELD_NAME";
|
||||
private static final String TIME_COL = "__time";
|
||||
private long[] times = {2436, 6879, 7888, 8224};
|
||||
@Mock
|
||||
private VectorValueSelector selector;
|
||||
@Mock
|
||||
private BaseLongVectorValueSelector timeSelector;
|
||||
private ByteBuffer buf;
|
||||
private LongFirstVectorAggregator target;
|
||||
|
||||
private LongFirstAggregatorFactory longFirstAggregatorFactory;
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private VectorColumnSelectorFactory selectorFactory;
|
||||
|
||||
@Before
|
||||
|
@ -66,17 +67,97 @@ public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
byte[] randomBytes = new byte[1024];
|
||||
ThreadLocalRandom.current().nextBytes(randomBytes);
|
||||
buf = ByteBuffer.wrap(randomBytes);
|
||||
Mockito.doReturn(VALUES).when(selector).getLongVector();
|
||||
Mockito.doReturn(times).when(timeSelector).getLongVector();
|
||||
timeSelector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(times.length, 0, times.length)
|
||||
{
|
||||
})
|
||||
{
|
||||
@Override
|
||||
public long[] getLongVector()
|
||||
{
|
||||
return times;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
return NULLS;
|
||||
}
|
||||
};
|
||||
selector = new BaseLongVectorValueSelector(new NoFilterVectorOffset(VALUES.length, 0, VALUES.length)
|
||||
{
|
||||
|
||||
})
|
||||
{
|
||||
@Override
|
||||
public long[] getLongVector()
|
||||
{
|
||||
return VALUES;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public boolean[] getNullVector()
|
||||
{
|
||||
if (!NullHandling.replaceWithDefault()) {
|
||||
return NULLS;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
target = new LongFirstVectorAggregator(timeSelector, selector);
|
||||
clearBufferForPositions(0, 0);
|
||||
|
||||
selectorFactory = new VectorColumnSelectorFactory()
|
||||
{
|
||||
@Override
|
||||
public ReadableVectorInspector getReadableVectorInspector()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
Mockito.doReturn(null).when(selectorFactory).getColumnCapabilities(FIELD_NAME);
|
||||
Mockito.doReturn(selector).when(selectorFactory).makeValueSelector(FIELD_NAME);
|
||||
Mockito.doReturn(timeSelector).when(selectorFactory).makeValueSelector(TIME_COL);
|
||||
@Override
|
||||
public SingleValueDimensionVectorSelector makeSingleValueDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiValueDimensionVectorSelector makeMultiValueDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorValueSelector makeValueSelector(String column)
|
||||
{
|
||||
if (TIME_COL.equals(column)) {
|
||||
return timeSelector;
|
||||
} else if (FIELD_NAME.equals(column)) {
|
||||
return selector;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public VectorObjectSelector makeObjectSelector(String column)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String column)
|
||||
{
|
||||
if (FIELD_NAME.equals(column)) {
|
||||
return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ColumnType.LONG);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
longFirstAggregatorFactory = new LongFirstAggregatorFactory(NAME, FIELD_NAME, TIME_COL);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -108,7 +189,6 @@ public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void aggregateWithNulls()
|
||||
{
|
||||
mockNullsVector();
|
||||
target.aggregate(buf, 0, 0, VALUES.length);
|
||||
Pair<Long, Long> result = (Pair<Long, Long>) target.get(buf, 0);
|
||||
Assert.assertEquals(times[0], result.lhs.longValue());
|
||||
|
@ -125,7 +205,11 @@ public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
for (int i = 0; i < positions.length; i++) {
|
||||
Pair<Long, Long> result = (Pair<Long, Long>) target.get(buf, positions[i] + positionOffset);
|
||||
Assert.assertEquals(times[i], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
|
||||
if (!NullHandling.replaceWithDefault() && NULLS[i]) {
|
||||
Assert.assertNull(result.rhs);
|
||||
} else {
|
||||
Assert.assertEquals(VALUES[i], result.rhs, EPSILON);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,7 +224,11 @@ public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
for (int i = 0; i < positions.length; i++) {
|
||||
Pair<Long, Long> result = (Pair<Long, Long>) target.get(buf, positions[i] + positionOffset);
|
||||
Assert.assertEquals(times[rows[i]], result.lhs.longValue());
|
||||
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
|
||||
if (!NullHandling.replaceWithDefault() && NULLS[rows[i]]) {
|
||||
Assert.assertNull(result.rhs);
|
||||
} else {
|
||||
Assert.assertEquals(VALUES[rows[i]], result.rhs, EPSILON);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,11 +238,4 @@ public class LongFirstVectorAggregationTest extends InitializedNullHandlingTest
|
|||
target.init(buf, offset + position);
|
||||
}
|
||||
}
|
||||
|
||||
private void mockNullsVector()
|
||||
{
|
||||
if (!NullHandling.replaceWithDefault()) {
|
||||
Mockito.doReturn(NULLS).when(selector).getNullVector();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1314,42 +1314,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffHeapEarliestGroupBy()
|
||||
{
|
||||
notMsqCompatible();
|
||||
|
||||
testQuery(
|
||||
"SELECT dim2, EARLIEST(m1) AS val1 FROM foo GROUP BY dim2",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(CalciteTests.DATASOURCE1)
|
||||
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
|
||||
.setAggregatorSpecs(aggregators(
|
||||
new FloatFirstAggregatorFactory("a0", "m1", null)
|
||||
)
|
||||
)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
NullHandling.sqlCompatible()
|
||||
? ImmutableList.of(
|
||||
new Object[]{null, 2.0f},
|
||||
new Object[]{"", 3.0f},
|
||||
new Object[]{"a", 1.0f},
|
||||
new Object[]{"abc", 5.0f}
|
||||
)
|
||||
: ImmutableList.of(
|
||||
new Object[]{"", 2.0f},
|
||||
new Object[]{"a", 1.0f},
|
||||
new Object[]{"abc", 5.0f}
|
||||
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEarliestAggregatorsNumericNulls()
|
||||
{
|
||||
|
@ -13985,39 +13949,4 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEarliestVectorAggregators()
|
||||
{
|
||||
notMsqCompatible();
|
||||
|
||||
testQuery(
|
||||
"SELECT "
|
||||
+ "EARLIEST(cnt), EARLIEST(cnt + 1), EARLIEST(m1), EARLIEST(m1+1) "
|
||||
+ "FROM druid.numfoo",
|
||||
ImmutableList.of(
|
||||
Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(CalciteTests.DATASOURCE3)
|
||||
.intervals(querySegmentSpec(Filtration.eternity()))
|
||||
.granularity(Granularities.ALL)
|
||||
.virtualColumns(
|
||||
expressionVirtualColumn("v0", "(\"cnt\" + 1)", ColumnType.LONG),
|
||||
expressionVirtualColumn("v1", "(\"m1\" + 1)", ColumnType.FLOAT)
|
||||
)
|
||||
.aggregators(
|
||||
aggregators(
|
||||
new LongFirstAggregatorFactory("a0", "cnt", null),
|
||||
new LongFirstAggregatorFactory("a1", "v0", null),
|
||||
new FloatFirstAggregatorFactory("a2", "m1", null),
|
||||
new FloatFirstAggregatorFactory("a3", "v1", null)
|
||||
)
|
||||
)
|
||||
.context(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{1L, 2L, 1.0f, 2.0f}
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue