mirror of https://github.com/apache/druid.git
Compressed Big Decimal Cleanup and Extension (#13048)
1. remove unnecessary generic type from CompressedBigDecimal 2. support Number input types 3. support aggregator reading supported input types directly (uningested data) 4. fix scaling bug in buffer aggregator
This commit is contained in:
parent
fd6c05eee8
commit
54a2eb7dcc
|
@ -70,7 +70,6 @@
|
|||
</option>
|
||||
<option name="IGNORE_FIELDS_USED_IN_MULTIPLE_METHODS" value="true" />
|
||||
</inspection_tool>
|
||||
<inspection_tool class="FieldMayBeFinal" enabled="true" level="WARNING" enabled_by_default="true" />
|
||||
<inspection_tool class="FinalStaticMethod" enabled="true" level="ERROR" enabled_by_default="true" />
|
||||
<inspection_tool class="FlowJSError" enabled="false" level="Non-TeamCity Error" enabled_by_default="false" />
|
||||
<inspection_tool class="ForCanBeForeach" enabled="true" level="WARNING" enabled_by_default="true">
|
||||
|
@ -372,12 +371,12 @@
|
|||
<constraint name="l" within="" contains="" />
|
||||
<constraint name="y" minCount="0" maxCount="2147483647" within="" contains="" />
|
||||
</searchConfiguration>
|
||||
<searchConfiguration name="Assign an ExecutorService instance to an ExecutorService variable, not an Executor variable" text="$x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA" >
|
||||
<searchConfiguration name="Assign an ExecutorService instance to an ExecutorService variable, not an Executor variable" text="$x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA">
|
||||
<constraint name="__context__" within="" contains="" />
|
||||
<constraint name="x" nameOfExprType="java\.util\.concurrent\.Executor" within="" contains="" />
|
||||
<constraint name="y" nameOfExprType="java\.util\.concurrent\.ExecutorService" exprTypeWithinHierarchy="true" within="" contains="" />
|
||||
</searchConfiguration>
|
||||
<searchConfiguration name="Intialize an ExecutorService instance to an ExecutorService variable, not an Executor variable" text="Executor $x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA" >
|
||||
<searchConfiguration name="Intialize an ExecutorService instance to an ExecutorService variable, not an Executor variable" text="Executor $x$ = $y$;" recursive="true" caseInsensitive="true" type="JAVA">
|
||||
<constraint name="__context__" within="" contains="" />
|
||||
<constraint name="x" within="" contains="" />
|
||||
<constraint name="y" nameOfExprType="java\.util\.concurrent\.ExecutorService" exprTypeWithinHierarchy="true" within="" contains="" />
|
||||
|
@ -453,7 +452,7 @@
|
|||
<inspection_tool class="UnnecessaryCallToStringValueOf" enabled="true" level="ERROR" enabled_by_default="true" />
|
||||
<inspection_tool class="UnnecessaryEnumModifier" enabled="true" level="ERROR" enabled_by_default="true" />
|
||||
<inspection_tool class="UnnecessaryFullyQualifiedName" enabled="true" level="WARNING" enabled_by_default="true">
|
||||
<scope name="NonGeneratedFiles" level="ERROR" enabled="true">
|
||||
<scope name="NonGeneratedFiles" level="ERROR" enabled="true" editorAttributes="NOT_USED_ELEMENT_ATTRIBUTES">
|
||||
<option name="m_ignoreJavadoc" value="true" />
|
||||
<option name="ignoreInModuleStatements" value="true" />
|
||||
</scope>
|
||||
|
@ -461,6 +460,10 @@
|
|||
<option name="ignoreInModuleStatements" value="true" />
|
||||
</inspection_tool>
|
||||
<inspection_tool class="UnnecessaryInterfaceModifier" enabled="true" level="ERROR" enabled_by_default="true" />
|
||||
<inspection_tool class="UnnecessaryLocalVariable" enabled="false" level="WARNING" enabled_by_default="false">
|
||||
<option name="m_ignoreImmediatelyReturnedVariables" value="false" />
|
||||
<option name="m_ignoreAnnotatedVariables" value="false" />
|
||||
</inspection_tool>
|
||||
<inspection_tool class="UnnecessaryToStringCall" enabled="true" level="ERROR" enabled_by_default="true" />
|
||||
<inspection_tool class="UnusedAssignment" enabled="true" level="ERROR" enabled_by_default="true">
|
||||
<option name="REPORT_PREFIX_EXPRESSIONS" value="true" />
|
||||
|
@ -480,8 +483,8 @@
|
|||
<inspection_tool class="XmlHighlighting" enabled="true" level="Non-TeamCity Warning" enabled_by_default="true" />
|
||||
<inspection_tool class="XmlInvalidId" enabled="true" level="Non-TeamCity Error" enabled_by_default="true" />
|
||||
<inspection_tool class="XmlPathReference" enabled="true" level="Non-TeamCity Error" enabled_by_default="true" />
|
||||
<inspection_tool class="unused" enabled="true" level="WARNING" enabled_by_default="true" isSelected="false">
|
||||
<scope name="UnusedInspectionsScope" level="ERROR" enabled="true" isSelected="false">
|
||||
<inspection_tool class="unused" enabled="true" level="WARNING" enabled_by_default="true" checkParameterExcludingHierarchy="false" isSelected="false">
|
||||
<scope name="UnusedInspectionsScope" level="ERROR" enabled="true" editorAttributes="NOT_USED_ELEMENT_ATTRIBUTES" checkParameterExcludingHierarchy="false" isSelected="false">
|
||||
<option name="LOCAL_VARIABLE" value="true" />
|
||||
<option name="FIELD" value="true" />
|
||||
<option name="METHOD" value="true" />
|
||||
|
|
|
@ -26,8 +26,7 @@ import java.math.BigInteger;
|
|||
/**
|
||||
* A compressed big decimal that holds its data with an embedded array.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class ArrayCompressedBigDecimal extends CompressedBigDecimal<ArrayCompressedBigDecimal>
|
||||
public class ArrayCompressedBigDecimal extends CompressedBigDecimal
|
||||
{
|
||||
|
||||
private static final int BYTE_MASK = 0xff;
|
||||
|
@ -85,7 +84,7 @@ public class ArrayCompressedBigDecimal extends CompressedBigDecimal<ArrayCompres
|
|||
*
|
||||
* @param initVal the initial value
|
||||
*/
|
||||
public ArrayCompressedBigDecimal(CompressedBigDecimal<?> initVal)
|
||||
public ArrayCompressedBigDecimal(CompressedBigDecimal initVal)
|
||||
{
|
||||
super(initVal.getScale());
|
||||
this.array = new int[initVal.getArraySize()];
|
||||
|
@ -134,6 +133,12 @@ public class ArrayCompressedBigDecimal extends CompressedBigDecimal<ArrayCompres
|
|||
return new ArrayCompressedBigDecimal(arr, scale);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompressedBigDecimal toHeap()
|
||||
{
|
||||
return this;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.druid.compressedbigdecimal.CompressedBigDecimal#getArraySize()
|
||||
*/
|
||||
|
|
|
@ -24,8 +24,7 @@ import java.nio.ByteBuffer;
|
|||
/**
|
||||
* A compressed big decimal that holds its data with an embedded array.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBufferCompressedBigDecimal>
|
||||
public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal
|
||||
{
|
||||
|
||||
private final ByteBuffer buf;
|
||||
|
@ -57,7 +56,7 @@ public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBuf
|
|||
* @param position the position in the ByteBuffer
|
||||
* @param val initial value
|
||||
*/
|
||||
public ByteBufferCompressedBigDecimal(ByteBuffer buf, int position, CompressedBigDecimal<?> val)
|
||||
public ByteBufferCompressedBigDecimal(ByteBuffer buf, int position, CompressedBigDecimal val)
|
||||
{
|
||||
super(val.getScale());
|
||||
this.buf = buf;
|
||||
|
@ -67,6 +66,12 @@ public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBuf
|
|||
copyToBuffer(buf, position, size, val);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompressedBigDecimal toHeap()
|
||||
{
|
||||
return new ArrayCompressedBigDecimal(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.druid.compressedbigdecimal.CompressedBigDecimal#getArraySize()
|
||||
*/
|
||||
|
@ -100,6 +105,7 @@ public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBuf
|
|||
buf.putInt(position + idx * Integer.BYTES, val);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Copy a compressed big decimal into a Bytebuffer in a format understood by this class.
|
||||
*
|
||||
|
@ -108,7 +114,7 @@ public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBuf
|
|||
* @param size The space (in number of ints) allocated for the value
|
||||
* @param val THe value to copy
|
||||
*/
|
||||
public static void copyToBuffer(ByteBuffer buf, int position, int size, CompressedBigDecimal<?> val)
|
||||
public static void copyToBuffer(ByteBuffer buf, int position, int size, CompressedBigDecimal val)
|
||||
{
|
||||
if (val.getArraySize() > size) {
|
||||
throw new IllegalArgumentException("Right hand side too big to fit in the result value");
|
||||
|
|
|
@ -27,12 +27,8 @@ import java.util.function.ToIntBiFunction;
|
|||
/**
|
||||
* Mutable big decimal value that can be used to accumulate values without losing precision or reallocating memory.
|
||||
* This helps in revenue based calculations
|
||||
*
|
||||
* @param <T> Type of actual derived class that contains the underlying data
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> extends Number
|
||||
implements Comparable<CompressedBigDecimal<T>>
|
||||
public abstract class CompressedBigDecimal extends Number implements Comparable<CompressedBigDecimal>
|
||||
{
|
||||
|
||||
private static final long INT_MASK = 0x00000000ffffffffL;
|
||||
|
@ -59,11 +55,10 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
|
|||
* than this value (the result), then the higher order bits are dropped, similar to
|
||||
* what happens when adding a long to an int and storing the result in an int.
|
||||
*
|
||||
* @param <S> type of compressedbigdecimal to accumulate
|
||||
* @param rhs The object to accumulate
|
||||
* @return a reference to <b>this</b>
|
||||
*/
|
||||
public <S extends CompressedBigDecimal<S>> CompressedBigDecimal<T> accumulate(CompressedBigDecimal<S> rhs)
|
||||
public CompressedBigDecimal accumulate(CompressedBigDecimal rhs)
|
||||
{
|
||||
if (rhs.scale != scale) {
|
||||
throw new IllegalArgumentException("Cannot accumulate MutableBigDecimals with differing scales");
|
||||
|
@ -72,7 +67,8 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
|
|||
throw new IllegalArgumentException("Right hand side too big to fit in the result value");
|
||||
}
|
||||
internalAdd(getArraySize(), this, CompressedBigDecimal::getArrayEntry, CompressedBigDecimal::setArrayEntry,
|
||||
rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry);
|
||||
rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry
|
||||
);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -81,7 +77,7 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
|
|||
*
|
||||
* @return this
|
||||
*/
|
||||
public CompressedBigDecimal<T> reset()
|
||||
public CompressedBigDecimal reset()
|
||||
{
|
||||
for (int ii = 0; ii < getArraySize(); ++ii) {
|
||||
setArrayEntry(ii, 0);
|
||||
|
@ -105,8 +101,15 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
|
|||
* @param rhs the object containing the right array data
|
||||
* @param rhsGet method reference to get an underlying right value
|
||||
*/
|
||||
static <R, S> void internalAdd(int llen, R lhs, ToIntBiFunction<R, Integer> lhsGet, ObjBiIntConsumer<R> lhsSet,
|
||||
int rlen, S rhs, ToIntBiFunction<S, Integer> rhsGet)
|
||||
static <R, S> void internalAdd(
|
||||
int llen,
|
||||
R lhs,
|
||||
ToIntBiFunction<R, Integer> lhsGet,
|
||||
ObjBiIntConsumer<R> lhsSet,
|
||||
int rlen,
|
||||
S rhs,
|
||||
ToIntBiFunction<S, Integer> rhsGet
|
||||
)
|
||||
{
|
||||
int commonLen = Integer.min(llen, rlen);
|
||||
long carry = 0;
|
||||
|
@ -179,6 +182,11 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
|
|||
return scale;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a version of this object that is on heap. Returns this if already on-heap
|
||||
*/
|
||||
public abstract CompressedBigDecimal toHeap();
|
||||
|
||||
/**
|
||||
* Return the array size.
|
||||
*
|
||||
|
@ -239,6 +247,7 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
|
|||
* -1 if Negative
|
||||
* 0 if Zero
|
||||
* 1 if Positive
|
||||
*
|
||||
* @param <S> type of object containing the array
|
||||
* @param size the underlying array size
|
||||
* @param rhs object that contains the underlying array
|
||||
|
@ -266,15 +275,26 @@ public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> ex
|
|||
* @see java.lang.Comparable#compareTo(java.lang.Object)
|
||||
*/
|
||||
@Override
|
||||
public int compareTo(CompressedBigDecimal<T> o)
|
||||
public int compareTo(CompressedBigDecimal o)
|
||||
{
|
||||
|
||||
if (this.equals(o)) {
|
||||
if (super.equals(o)) {
|
||||
return 0;
|
||||
}
|
||||
return this.toBigDecimal().compareTo(o.toBigDecimal());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return toBigDecimal().hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj)
|
||||
{
|
||||
return obj instanceof CompressedBigDecimal && toBigDecimal().equals(((CompressedBigDecimal) obj).toBigDecimal());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value of the specified number as an {@code int},
|
||||
* which may involve rounding or truncation.
|
||||
|
|
|
@ -28,18 +28,18 @@ import javax.annotation.Nullable;
|
|||
/**
|
||||
* AggregateCombiner for CompressedBigDecimals.
|
||||
*/
|
||||
public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<CompressedBigDecimal<?>>
|
||||
public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<CompressedBigDecimal>
|
||||
{
|
||||
private CompressedBigDecimal<?> sum;
|
||||
private CompressedBigDecimal sum;
|
||||
|
||||
@Override
|
||||
public void reset(@SuppressWarnings("rawtypes") ColumnValueSelector columnValueSelector)
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
ColumnValueSelector<CompressedBigDecimal<?>> selector =
|
||||
(ColumnValueSelector<CompressedBigDecimal<?>>) columnValueSelector;
|
||||
ColumnValueSelector<CompressedBigDecimal> selector =
|
||||
(ColumnValueSelector<CompressedBigDecimal>) columnValueSelector;
|
||||
|
||||
CompressedBigDecimal<?> cbd = selector.getObject();
|
||||
CompressedBigDecimal cbd = selector.getObject();
|
||||
if (sum == null) {
|
||||
sum = new ArrayCompressedBigDecimal(cbd);
|
||||
} else {
|
||||
|
@ -52,10 +52,10 @@ public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<
|
|||
public void fold(@SuppressWarnings("rawtypes") ColumnValueSelector columnValueSelector)
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
ColumnValueSelector<CompressedBigDecimal<?>> selector =
|
||||
(ColumnValueSelector<CompressedBigDecimal<?>>) columnValueSelector;
|
||||
ColumnValueSelector<CompressedBigDecimal> selector =
|
||||
(ColumnValueSelector<CompressedBigDecimal>) columnValueSelector;
|
||||
|
||||
CompressedBigDecimal<?> cbd = selector.getObject();
|
||||
CompressedBigDecimal cbd = selector.getObject();
|
||||
|
||||
if (sum == null) {
|
||||
sum = new ArrayCompressedBigDecimal(cbd);
|
||||
|
@ -86,15 +86,14 @@ public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<
|
|||
|
||||
@Nullable
|
||||
@Override
|
||||
public CompressedBigDecimal<?> getObject()
|
||||
public CompressedBigDecimal getObject()
|
||||
{
|
||||
return sum;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Class<CompressedBigDecimal<?>> classOfObject()
|
||||
public Class<CompressedBigDecimal> classOfObject()
|
||||
{
|
||||
return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
|
||||
return CompressedBigDecimal.class;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,8 +28,8 @@ import org.apache.druid.segment.ColumnValueSelector;
|
|||
public class CompressedBigDecimalAggregator implements Aggregator
|
||||
{
|
||||
|
||||
private final ColumnValueSelector<CompressedBigDecimal<?>> selector;
|
||||
private final CompressedBigDecimal<?> sum;
|
||||
private final ColumnValueSelector<CompressedBigDecimal> selector;
|
||||
private final CompressedBigDecimal sum;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
@ -41,7 +41,7 @@ public class CompressedBigDecimalAggregator implements Aggregator
|
|||
public CompressedBigDecimalAggregator(
|
||||
int size,
|
||||
int scale,
|
||||
ColumnValueSelector<CompressedBigDecimal<?>> selector
|
||||
ColumnValueSelector<CompressedBigDecimal> selector
|
||||
)
|
||||
{
|
||||
this.selector = selector;
|
||||
|
@ -54,10 +54,10 @@ public class CompressedBigDecimalAggregator implements Aggregator
|
|||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
CompressedBigDecimal<?> selectedObject = selector.getObject();
|
||||
CompressedBigDecimal selectedObject = Utils.objToCompressedBigDecimal(selector.getObject());
|
||||
if (selectedObject != null) {
|
||||
if (selectedObject.getScale() != sum.getScale()) {
|
||||
selectedObject = Utils.scaleUp(selectedObject);
|
||||
selectedObject = Utils.scaleUp(selectedObject, sum.getScale());
|
||||
}
|
||||
sum.accumulate(selectedObject);
|
||||
}
|
||||
|
|
|
@ -44,22 +44,14 @@ import java.util.List;
|
|||
* An aggregator factory to generate longSum aggregator object.
|
||||
*/
|
||||
public class CompressedBigDecimalAggregatorFactory
|
||||
extends NullableNumericAggregatorFactory<ColumnValueSelector<CompressedBigDecimal<?>>>
|
||||
extends NullableNumericAggregatorFactory<ColumnValueSelector<CompressedBigDecimal>>
|
||||
{
|
||||
|
||||
public static final int DEFAULT_SCALE = 9;
|
||||
public static final int DEFAULT_SIZE = 3;
|
||||
private static final byte CACHE_TYPE_ID = 0x37;
|
||||
|
||||
public static final Comparator<CompressedBigDecimal<?>> COMPARATOR = new Comparator<CompressedBigDecimal<?>>()
|
||||
{
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
@Override
|
||||
public int compare(CompressedBigDecimal lhs, CompressedBigDecimal rhs)
|
||||
{
|
||||
return lhs.compareTo(rhs);
|
||||
}
|
||||
};
|
||||
public static final Comparator<CompressedBigDecimal> COMPARATOR = CompressedBigDecimal::compareTo;
|
||||
|
||||
private final String name;
|
||||
private final String fieldName;
|
||||
|
@ -90,21 +82,21 @@ public class CompressedBigDecimalAggregatorFactory
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected ColumnValueSelector<CompressedBigDecimal<?>> selector(ColumnSelectorFactory metricFactory)
|
||||
protected ColumnValueSelector<CompressedBigDecimal> selector(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
return (ColumnValueSelector<CompressedBigDecimal<?>>) metricFactory.makeColumnValueSelector(fieldName);
|
||||
return (ColumnValueSelector<CompressedBigDecimal>) metricFactory.makeColumnValueSelector(fieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Aggregator factorize(ColumnSelectorFactory metricFactory,
|
||||
@Nonnull ColumnValueSelector<CompressedBigDecimal<?>> selector)
|
||||
@Nonnull ColumnValueSelector<CompressedBigDecimal> selector)
|
||||
{
|
||||
return new CompressedBigDecimalAggregator(size, scale, selector);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory,
|
||||
@Nonnull ColumnValueSelector<CompressedBigDecimal<?>> selector)
|
||||
@Nonnull ColumnValueSelector<CompressedBigDecimal> selector)
|
||||
{
|
||||
return new CompressedBigDecimalBufferAggregator(size, scale, selector);
|
||||
}
|
||||
|
@ -113,7 +105,7 @@ public class CompressedBigDecimalAggregatorFactory
|
|||
* @see org.apache.druid.query.aggregation.AggregatorFactory#getComparator()
|
||||
*/
|
||||
@Override
|
||||
public Comparator<CompressedBigDecimal<?>> getComparator()
|
||||
public Comparator<CompressedBigDecimal> getComparator()
|
||||
{
|
||||
return COMPARATOR;
|
||||
}
|
||||
|
@ -137,9 +129,9 @@ public class CompressedBigDecimalAggregatorFactory
|
|||
// due to truncation when the deserialized objects aren't big enough to hold the accumlated result.
|
||||
// The most common case this avoids is deserializing 0E-9 into a CompressedBigDecimal with array
|
||||
// size 1 and then accumulating a larger value into it.
|
||||
CompressedBigDecimal<?> retVal = ArrayCompressedBigDecimal.allocate(size, scale);
|
||||
CompressedBigDecimal<?> left = (CompressedBigDecimal<?>) lhs;
|
||||
CompressedBigDecimal<?> right = (CompressedBigDecimal<?>) rhs;
|
||||
CompressedBigDecimal retVal = ArrayCompressedBigDecimal.allocate(size, scale);
|
||||
CompressedBigDecimal left = (CompressedBigDecimal) lhs;
|
||||
CompressedBigDecimal right = (CompressedBigDecimal) rhs;
|
||||
if (left.signum() != 0) {
|
||||
retVal.accumulate(left);
|
||||
}
|
||||
|
@ -160,7 +152,7 @@ public class CompressedBigDecimalAggregatorFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public AggregateCombiner<CompressedBigDecimal<?>> makeAggregateCombiner()
|
||||
public AggregateCombiner<CompressedBigDecimal> makeAggregateCombiner()
|
||||
{
|
||||
return new CompressedBigDecimalAggregateCombiner();
|
||||
}
|
||||
|
@ -240,7 +232,7 @@ public class CompressedBigDecimalAggregatorFactory
|
|||
@Override
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
CompressedBigDecimal<?> compressedBigDecimal = (CompressedBigDecimal<?>) object;
|
||||
CompressedBigDecimal compressedBigDecimal = (CompressedBigDecimal) object;
|
||||
BigDecimal bigDecimal = compressedBigDecimal.toBigDecimal();
|
||||
return bigDecimal.compareTo(BigDecimal.ZERO) == 0 ? 0 : bigDecimal;
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ public class CompressedBigDecimalBufferAggregator implements BufferAggregator
|
|||
|
||||
//Cache will hold the aggregated value.
|
||||
//We are using ByteBuffer to hold the key to the aggregated value.
|
||||
private final ColumnValueSelector<CompressedBigDecimal<?>> selector;
|
||||
private final ColumnValueSelector<CompressedBigDecimal> selector;
|
||||
private final int size;
|
||||
private final int scale;
|
||||
|
||||
|
@ -46,7 +46,7 @@ public class CompressedBigDecimalBufferAggregator implements BufferAggregator
|
|||
public CompressedBigDecimalBufferAggregator(
|
||||
int size,
|
||||
int scale,
|
||||
ColumnValueSelector<CompressedBigDecimal<?>> selector
|
||||
ColumnValueSelector<CompressedBigDecimal> selector
|
||||
)
|
||||
{
|
||||
this.selector = selector;
|
||||
|
@ -71,7 +71,7 @@ public class CompressedBigDecimalBufferAggregator implements BufferAggregator
|
|||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
CompressedBigDecimal<?> addend = selector.getObject();
|
||||
CompressedBigDecimal addend = Utils.objToCompressedBigDecimal(selector.getObject());
|
||||
if (addend != null) {
|
||||
Utils.accumulate(buf, position, size, scale, addend);
|
||||
}
|
||||
|
@ -83,7 +83,16 @@ public class CompressedBigDecimalBufferAggregator implements BufferAggregator
|
|||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
return new ByteBufferCompressedBigDecimal(buf, position, size, scale);
|
||||
ByteBufferCompressedBigDecimal byteBufferCompressedBigDecimal = new ByteBufferCompressedBigDecimal(
|
||||
buf,
|
||||
position,
|
||||
size,
|
||||
scale
|
||||
);
|
||||
|
||||
CompressedBigDecimal heapCompressedBigDecimal = byteBufferCompressedBigDecimal.toHeap();
|
||||
|
||||
return heapCompressedBigDecimal;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
|
|
@ -68,7 +68,7 @@ public class CompressedBigDecimalColumn implements ComplexColumn
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompressedBigDecimal<?> getRowValue(int rowNum)
|
||||
public CompressedBigDecimal getRowValue(int rowNum)
|
||||
{
|
||||
int s = scale.get(rowNum);
|
||||
|
||||
|
@ -91,20 +91,20 @@ public class CompressedBigDecimalColumn implements ComplexColumn
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnValueSelector<?> makeColumnValueSelector(final ReadableOffset offset)
|
||||
public ColumnValueSelector makeColumnValueSelector(final ReadableOffset offset)
|
||||
{
|
||||
return new ObjectColumnSelector<CompressedBigDecimal<?>>()
|
||||
return new ObjectColumnSelector<CompressedBigDecimal>()
|
||||
{
|
||||
@Override @Nullable
|
||||
public CompressedBigDecimal<?> getObject()
|
||||
public CompressedBigDecimal getObject()
|
||||
{
|
||||
return getRowValue(offset.getOffset());
|
||||
}
|
||||
|
||||
@Override @SuppressWarnings("unchecked")
|
||||
public Class<CompressedBigDecimal<?>> classOfObject()
|
||||
public Class<CompressedBigDecimal> classOfObject()
|
||||
{
|
||||
return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
|
||||
return (Class<CompressedBigDecimal>) (Class) CompressedBigDecimal.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.io.IOException;
|
|||
/**
|
||||
* CompressedBigDecimal json serializer.
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public class CompressedBigDecimalJsonSerializer extends JsonSerializer<CompressedBigDecimal>
|
||||
{
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ import java.util.Locale;
|
|||
/**
|
||||
* Column Serializer that understands converting CompressedBigDecimal to 4 byte long values for better storage.
|
||||
*/
|
||||
public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSerializer<CompressedBigDecimal<?>>
|
||||
public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSerializer<CompressedBigDecimal>
|
||||
{
|
||||
private static final byte VERSION = CompressedBigDecimalColumnPartSupplier.VERSION;
|
||||
|
||||
|
@ -92,9 +92,9 @@ public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSe
|
|||
}
|
||||
|
||||
@Override
|
||||
public void serialize(ColumnValueSelector<? extends CompressedBigDecimal<?>> obj) throws IOException
|
||||
public void serialize(ColumnValueSelector<? extends CompressedBigDecimal> obj) throws IOException
|
||||
{
|
||||
CompressedBigDecimal<?> abd = obj.getObject();
|
||||
CompressedBigDecimal abd = obj.getObject();
|
||||
int[] array = new int[abd.getArraySize()];
|
||||
for (int ii = 0; ii < abd.getArraySize(); ++ii) {
|
||||
array[ii] = abd.getArrayEntry(ii);
|
||||
|
|
|
@ -20,14 +20,12 @@
|
|||
package org.apache.druid.compressedbigdecimal;
|
||||
|
||||
import org.apache.druid.data.input.InputRow;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.segment.column.ColumnBuilder;
|
||||
import org.apache.druid.segment.data.ObjectStrategy;
|
||||
import org.apache.druid.segment.serde.ComplexMetricExtractor;
|
||||
import org.apache.druid.segment.serde.ComplexMetricSerde;
|
||||
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
|
@ -36,7 +34,7 @@ import java.nio.ByteBuffer;
|
|||
public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
|
||||
{
|
||||
|
||||
private CompressedBigDecimalObjectStrategy strategy = new CompressedBigDecimalObjectStrategy();
|
||||
private final CompressedBigDecimalObjectStrategy strategy = new CompressedBigDecimalObjectStrategy();
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see ComplexMetricSerde#getTypeName()
|
||||
|
@ -48,32 +46,22 @@ public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
|
|||
}
|
||||
|
||||
@Override
|
||||
public ComplexMetricExtractor<CompressedBigDecimal<?>> getExtractor()
|
||||
public ComplexMetricExtractor<CompressedBigDecimal> getExtractor()
|
||||
{
|
||||
return new ComplexMetricExtractor<CompressedBigDecimal<?>>()
|
||||
return new ComplexMetricExtractor<CompressedBigDecimal>()
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Class<CompressedBigDecimal<?>> extractedClass()
|
||||
public Class<CompressedBigDecimal> extractedClass()
|
||||
{
|
||||
return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
|
||||
return CompressedBigDecimal.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompressedBigDecimal<?> extractValue(InputRow inputRow, String metricName)
|
||||
public CompressedBigDecimal extractValue(InputRow inputRow, String metricName)
|
||||
{
|
||||
Object rawMetric = inputRow.getRaw(metricName);
|
||||
if (rawMetric == null) {
|
||||
return null;
|
||||
} else if (rawMetric instanceof BigDecimal) {
|
||||
return new ArrayCompressedBigDecimal((BigDecimal) rawMetric);
|
||||
} else if (rawMetric instanceof String) {
|
||||
return new ArrayCompressedBigDecimal(new BigDecimal((String) rawMetric));
|
||||
} else if (rawMetric instanceof CompressedBigDecimal<?>) {
|
||||
return (CompressedBigDecimal<?>) rawMetric;
|
||||
} else {
|
||||
throw new ISE("Unknown extraction value type: [%s]", rawMetric.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
return Utils.objToCompressedBigDecimal(rawMetric);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -95,7 +83,8 @@ public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
|
|||
@Override
|
||||
public CompressedBigDecimalLongColumnSerializer getSerializer(
|
||||
SegmentWriteOutMedium segmentWriteOutMedium,
|
||||
String column)
|
||||
String column
|
||||
)
|
||||
{
|
||||
return CompressedBigDecimalLongColumnSerializer.create(segmentWriteOutMedium, column);
|
||||
}
|
||||
|
@ -104,7 +93,7 @@ public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
|
|||
* @see ComplexMetricSerde#getObjectStrategy()
|
||||
*/
|
||||
@Override
|
||||
public ObjectStrategy<CompressedBigDecimal<?>> getObjectStrategy()
|
||||
public ObjectStrategy<CompressedBigDecimal> getObjectStrategy()
|
||||
{
|
||||
return strategy;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.nio.IntBuffer;
|
|||
/**
|
||||
* Defines strategy on how to read and write data from deep storage.
|
||||
*/
|
||||
public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<CompressedBigDecimal<?>>
|
||||
public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<CompressedBigDecimal>
|
||||
{
|
||||
|
||||
/*
|
||||
|
@ -61,7 +61,7 @@ public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<Compre
|
|||
* @see org.apache.druid.segment.data.ObjectStrategy#fromByteBuffer(java.nio.ByteBuffer, int)
|
||||
*/
|
||||
@Override
|
||||
public CompressedBigDecimal<?> fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
public CompressedBigDecimal fromByteBuffer(ByteBuffer buffer, int numBytes)
|
||||
{
|
||||
ByteBuffer myBuf = buffer.slice();
|
||||
myBuf.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
@ -79,7 +79,7 @@ public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<Compre
|
|||
* @see org.apache.druid.segment.data.ObjectStrategy#toBytes(java.lang.Object)
|
||||
*/
|
||||
@Override
|
||||
public byte[] toBytes(CompressedBigDecimal<?> val)
|
||||
public byte[] toBytes(CompressedBigDecimal val)
|
||||
{
|
||||
ByteBuffer buf = ByteBuffer.allocate(4 * (val.getArraySize() + 1));
|
||||
buf.order(ByteOrder.LITTLE_ENDIAN);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.apache.druid.compressedbigdecimal;
|
||||
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
@ -37,15 +38,13 @@ public class Utils
|
|||
* than this value (the result), then the higher order bits are dropped, similar to
|
||||
* what happens when adding a long to an int and storing the result in an int.
|
||||
*
|
||||
* @param <S> Type of CompressedBigDecimal into which to accumulate
|
||||
* @param lhs The object into which to accumulate
|
||||
* @param rhs The object to accumulate
|
||||
* @return a reference to <b>this</b>
|
||||
*/
|
||||
public static <S extends CompressedBigDecimal<S>>
|
||||
CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, BigDecimal rhs)
|
||||
public static CompressedBigDecimal accumulate(CompressedBigDecimal lhs, BigDecimal rhs)
|
||||
{
|
||||
CompressedBigDecimal<ArrayCompressedBigDecimal> abd =
|
||||
CompressedBigDecimal abd =
|
||||
new ArrayCompressedBigDecimal(rhs.setScale(lhs.getScale()));
|
||||
return lhs.accumulate(abd);
|
||||
}
|
||||
|
@ -58,36 +57,33 @@ public class Utils
|
|||
* than this value (the result), then the higher order bits are dropped, similar to
|
||||
* what happens when adding a long to an int and storing the result in an int.
|
||||
*
|
||||
* @param <S> Type of CompressedBigDecimal into which to accumulate
|
||||
* @param lhs The object into which to accumulate
|
||||
* @param rhs The object to accumulate
|
||||
* @param rhsScale The scale to apply to the long being accumulated
|
||||
* @return a reference to <b>this</b>
|
||||
*/
|
||||
public static <S extends CompressedBigDecimal<S>>
|
||||
CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, long rhs, int rhsScale)
|
||||
public static CompressedBigDecimal accumulate(CompressedBigDecimal lhs, long rhs, int rhsScale)
|
||||
{
|
||||
CompressedBigDecimal<ArrayCompressedBigDecimal> abd = new ArrayCompressedBigDecimal(rhs, rhsScale);
|
||||
CompressedBigDecimal abd = new ArrayCompressedBigDecimal(rhs, rhsScale);
|
||||
return lhs.accumulate(abd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Accumulate using IndexedInts read from Druid's segment file.
|
||||
*
|
||||
* @param <S> Type of CompressedBigDecimal into which to accumulate
|
||||
* @param lhs The object into which to accumulate
|
||||
* @param rhs IndexedInts representing array of magnitude values
|
||||
* @param rhsScale the scale
|
||||
* @return a reference to <b>this</b>
|
||||
*/
|
||||
public static <S extends CompressedBigDecimal<S>>
|
||||
CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, IndexedInts rhs, int rhsScale)
|
||||
public static CompressedBigDecimal accumulate(CompressedBigDecimal lhs, IndexedInts rhs, int rhsScale)
|
||||
{
|
||||
if (rhs.size() > lhs.getArraySize()) {
|
||||
throw new IllegalArgumentException("Right hand side too big to fit in the result value");
|
||||
}
|
||||
CompressedBigDecimal.internalAdd(lhs.getArraySize(), lhs, CompressedBigDecimal::getArrayEntry,
|
||||
CompressedBigDecimal::setArrayEntry, rhs.size(), rhs, IndexedInts::get);
|
||||
CompressedBigDecimal::setArrayEntry, rhs.size(), rhs, IndexedInts::get
|
||||
);
|
||||
return lhs;
|
||||
}
|
||||
|
||||
|
@ -100,45 +96,79 @@ public class Utils
|
|||
* @param lhsScale The scale of the left
|
||||
* @param rhs the right side to accumlate
|
||||
*/
|
||||
public static void accumulate(ByteBuffer buf, int pos, int lhsSize, int lhsScale, CompressedBigDecimal<?> rhs)
|
||||
public static void accumulate(ByteBuffer buf, int pos, int lhsSize, int lhsScale, CompressedBigDecimal rhs)
|
||||
{
|
||||
if (rhs.getArraySize() > lhsSize) {
|
||||
throw new IllegalArgumentException("Right hand side too big to fit in the result value");
|
||||
}
|
||||
BufferAccessor accessor = BufferAccessor.prepare(pos);
|
||||
CompressedBigDecimal.internalAdd(lhsSize, buf, accessor, accessor,
|
||||
rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry);
|
||||
if (rhs.getScale() != lhsScale) {
|
||||
rhs = Utils.scaleUp(rhs);
|
||||
}
|
||||
CompressedBigDecimal.internalAdd(
|
||||
lhsSize,
|
||||
buf,
|
||||
accessor,
|
||||
accessor,
|
||||
rhs.getArraySize(),
|
||||
rhs,
|
||||
CompressedBigDecimal::getArrayEntry
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@code CompressedBigDecimal} whose scale is moderated as per the default scale.
|
||||
*
|
||||
* @param <S> Type of CompressedBigDecimal to scale
|
||||
* @param val The value to scale up
|
||||
* @return Scaled up compressedBigDecimal
|
||||
*/
|
||||
public static <S extends CompressedBigDecimal<S>>
|
||||
CompressedBigDecimal<ArrayCompressedBigDecimal> scaleUp(CompressedBigDecimal<S> val)
|
||||
public static CompressedBigDecimal scaleUp(CompressedBigDecimal val)
|
||||
{
|
||||
return new ArrayCompressedBigDecimal(
|
||||
val.toBigDecimal().setScale(CompressedBigDecimalAggregatorFactory.DEFAULT_SCALE, BigDecimal.ROUND_UP)
|
||||
);
|
||||
}
|
||||
|
||||
public static CompressedBigDecimal scaleUp(CompressedBigDecimal val, int scale)
|
||||
{
|
||||
return new ArrayCompressedBigDecimal(
|
||||
val.toBigDecimal().setScale(scale, BigDecimal.ROUND_UP)
|
||||
);
|
||||
}
|
||||
|
||||
public static CompressedBigDecimal objToCompressedBigDecimal(Object obj)
|
||||
{
|
||||
CompressedBigDecimal result;
|
||||
if (obj == null) {
|
||||
result = null;
|
||||
} else if (obj instanceof BigDecimal) {
|
||||
result = new ArrayCompressedBigDecimal((BigDecimal) obj);
|
||||
} else if (obj instanceof Long) {
|
||||
result = new ArrayCompressedBigDecimal(new BigDecimal((Long) obj));
|
||||
} else if (obj instanceof Integer) {
|
||||
result = new ArrayCompressedBigDecimal(new BigDecimal((Integer) obj));
|
||||
} else if (obj instanceof Double) {
|
||||
result = new ArrayCompressedBigDecimal(BigDecimal.valueOf((Double) obj));
|
||||
} else if (obj instanceof Float) {
|
||||
result = new ArrayCompressedBigDecimal(BigDecimal.valueOf((Float) obj));
|
||||
} else if (obj instanceof String) {
|
||||
result = new ArrayCompressedBigDecimal(new BigDecimal((String) obj));
|
||||
} else if (obj instanceof CompressedBigDecimal) {
|
||||
result = (CompressedBigDecimal) obj;
|
||||
} else {
|
||||
throw new ISE("Unknown extraction value type: [%s]", obj.getClass().getSimpleName());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class that maintains a cache of thread local objects that can be used to access
|
||||
* a ByteBuffer in {@link Utils#accumulate(ByteBuffer, int, int, int, CompressedBigDecimal)}.
|
||||
*/
|
||||
private static class BufferAccessor implements ToIntBiFunction<ByteBuffer, Integer>, ObjBiIntConsumer<ByteBuffer>
|
||||
{
|
||||
private static ThreadLocal<BufferAccessor> cache = new ThreadLocal<BufferAccessor>()
|
||||
{
|
||||
@Override
|
||||
protected BufferAccessor initialValue()
|
||||
{
|
||||
return new BufferAccessor();
|
||||
}
|
||||
};
|
||||
private static final ThreadLocal<BufferAccessor> CACHE = ThreadLocal.withInitial(BufferAccessor::new);
|
||||
|
||||
private int position = 0;
|
||||
|
||||
|
@ -150,7 +180,7 @@ public class Utils
|
|||
*/
|
||||
public static BufferAccessor prepare(int position)
|
||||
{
|
||||
BufferAccessor accessor = cache.get();
|
||||
BufferAccessor accessor = CACHE.get();
|
||||
accessor.position = position;
|
||||
return accessor;
|
||||
}
|
||||
|
|
|
@ -24,15 +24,13 @@ import org.apache.druid.segment.data.ColumnarInts;
|
|||
import org.apache.druid.segment.data.ColumnarMultiInts;
|
||||
import org.apache.druid.segment.data.ReadableOffset;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
|
||||
public class AggregatorCombinerFactoryTest
|
||||
{
|
||||
|
@ -46,10 +44,10 @@ public class AggregatorCombinerFactoryTest
|
|||
ColumnarInts ci = EasyMock.createMock(ColumnarInts.class);
|
||||
ReadableOffset ro = EasyMock.createMock(ReadableOffset.class);
|
||||
CompressedBigDecimalColumn cbr = new CompressedBigDecimalColumn(ci, cmi);
|
||||
assertEquals(CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL, cbr.getTypeName());
|
||||
assertEquals(0, cbr.getLength());
|
||||
assertEquals(CompressedBigDecimalColumn.class, cbr.getClazz());
|
||||
assertNotNull(cbr.makeColumnValueSelector(ro));
|
||||
Assert.assertEquals(CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL, cbr.getTypeName());
|
||||
Assert.assertEquals(0, cbr.getLength());
|
||||
Assert.assertEquals(CompressedBigDecimalColumn.class, cbr.getClazz());
|
||||
Assert.assertNotNull(cbr.makeColumnValueSelector(ro));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -59,23 +57,35 @@ public class AggregatorCombinerFactoryTest
|
|||
public void testCompressedBigDecimalAggregatorFactory()
|
||||
{
|
||||
CompressedBigDecimalAggregatorFactory cf = new CompressedBigDecimalAggregatorFactory("name", "fieldName", 9, 0);
|
||||
assertEquals("CompressedBigDecimalAggregatorFactory{name='name', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}", cf.toString());
|
||||
assertNotNull(cf.getCacheKey());
|
||||
assertNull(cf.deserialize(null));
|
||||
assertEquals("5", cf.deserialize(new BigDecimal(5)).toString());
|
||||
assertEquals("5", cf.deserialize(5d).toString());
|
||||
assertEquals("5", cf.deserialize("5").toString());
|
||||
assertEquals("[CompressedBigDecimalAggregatorFactory{name='fieldName', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}]", Arrays.toString(cf.getRequiredColumns().toArray()));
|
||||
assertEquals("0", cf.combine(null, null).toString());
|
||||
assertEquals("4", cf.combine(new BigDecimal(4), null).toString());
|
||||
assertEquals("4", cf.combine(null, new BigDecimal(4)).toString());
|
||||
assertEquals("8", cf.combine(new ArrayCompressedBigDecimal(new BigDecimal(4)), new ArrayCompressedBigDecimal(new BigDecimal(4))).toString());
|
||||
Assert.assertEquals(
|
||||
"CompressedBigDecimalAggregatorFactory{name='name', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}",
|
||||
cf.toString()
|
||||
);
|
||||
Assert.assertNotNull(cf.getCacheKey());
|
||||
Assert.assertNull(cf.deserialize(null));
|
||||
Assert.assertEquals("5", cf.deserialize(new BigDecimal(5)).toString());
|
||||
Assert.assertEquals("5", cf.deserialize(5d).toString());
|
||||
Assert.assertEquals("5", cf.deserialize("5").toString());
|
||||
Assert.assertEquals(
|
||||
"[CompressedBigDecimalAggregatorFactory{name='fieldName', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}]",
|
||||
Arrays.toString(cf.getRequiredColumns().toArray())
|
||||
);
|
||||
Assert.assertEquals("0", cf.combine(null, null).toString());
|
||||
Assert.assertEquals("4", cf.combine(new BigDecimal(4), null).toString());
|
||||
Assert.assertEquals("4", cf.combine(null, new BigDecimal(4)).toString());
|
||||
Assert.assertEquals(
|
||||
"8",
|
||||
cf.combine(
|
||||
new ArrayCompressedBigDecimal(new BigDecimal(4)),
|
||||
new ArrayCompressedBigDecimal(new BigDecimal(4))
|
||||
).toString()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test method for {@link CompressedBigDecimalAggregatorFactory#deserialize(Object)}.
|
||||
*/
|
||||
@Test (expected = RuntimeException.class)
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void testCompressedBigDecimalAggregatorFactoryDeserialize()
|
||||
{
|
||||
CompressedBigDecimalAggregatorFactory cf = new CompressedBigDecimalAggregatorFactory("name", "fieldName", 9, 0);
|
||||
|
@ -85,10 +95,10 @@ public class AggregatorCombinerFactoryTest
|
|||
/**
|
||||
* Test method for {@link CompressedBigDecimalBufferAggregator#getFloat(ByteBuffer, int)}
|
||||
*/
|
||||
@Test (expected = UnsupportedOperationException.class)
|
||||
@Test(expected = UnsupportedOperationException.class)
|
||||
public void testCompressedBigDecimalBufferAggregatorGetFloat()
|
||||
{
|
||||
ColumnValueSelector cs = EasyMock.createMock(ColumnValueSelector.class);
|
||||
ColumnValueSelector<CompressedBigDecimal> cs = EasyMock.createMock(ColumnValueSelector.class);
|
||||
ByteBuffer bbuf = ByteBuffer.allocate(10);
|
||||
CompressedBigDecimalBufferAggregator ca = new CompressedBigDecimalBufferAggregator(4, 0, cs);
|
||||
ca.getFloat(bbuf, 0);
|
||||
|
@ -97,10 +107,10 @@ public class AggregatorCombinerFactoryTest
|
|||
/**
|
||||
* Test method for {@link CompressedBigDecimalBufferAggregator#getLong(ByteBuffer, int)}
|
||||
*/
|
||||
@Test (expected = UnsupportedOperationException.class)
|
||||
@Test(expected = UnsupportedOperationException.class)
|
||||
public void testCompressedBigDecimalBufferAggregatorGetLong()
|
||||
{
|
||||
ColumnValueSelector cs = EasyMock.createMock(ColumnValueSelector.class);
|
||||
ColumnValueSelector<CompressedBigDecimal> cs = EasyMock.createMock(ColumnValueSelector.class);
|
||||
ByteBuffer bbuf = ByteBuffer.allocate(10);
|
||||
CompressedBigDecimalBufferAggregator ca = new CompressedBigDecimalBufferAggregator(4, 0, cs);
|
||||
ca.getLong(bbuf, 0);
|
||||
|
@ -114,7 +124,7 @@ public class AggregatorCombinerFactoryTest
|
|||
{
|
||||
CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
|
||||
CompressedBigDecimal c = cc.getObject();
|
||||
assertSame(null, c);
|
||||
Assert.assertSame(null, c);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -124,7 +134,7 @@ public class AggregatorCombinerFactoryTest
|
|||
public void testCompressedBigDecimalAggregateCombinerClassofObject()
|
||||
{
|
||||
CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
|
||||
assertSame(CompressedBigDecimalAggregateCombiner.class, cc.getClass());
|
||||
Assert.assertSame(CompressedBigDecimalAggregateCombiner.class, cc.getClass());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -29,8 +29,12 @@ import org.apache.druid.query.groupby.GroupByQuery;
|
|||
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
||||
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
|
||||
import org.apache.druid.query.groupby.ResultRow;
|
||||
import org.hamcrest.collection.IsCollectionWithSize;
|
||||
import org.hamcrest.collection.IsMapContaining;
|
||||
import org.hamcrest.collection.IsMapWithSize;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -48,11 +52,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
|
||||
import static org.hamcrest.collection.IsMapContaining.hasEntry;
|
||||
import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
/**
|
||||
* Unit tests for AccumulatingDecimalAggregator.
|
||||
|
@ -88,10 +87,7 @@ public class CompressedBigDecimalAggregatorGroupByTest
|
|||
{
|
||||
final List<Object[]> constructors = new ArrayList<>();
|
||||
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
|
||||
if ("v2ParallelCombine".equals(config.toString())) {
|
||||
continue;
|
||||
}
|
||||
constructors.add(new Object[] {config});
|
||||
constructors.add(new Object[]{config});
|
||||
}
|
||||
return constructors;
|
||||
}
|
||||
|
@ -122,7 +118,8 @@ public class CompressedBigDecimalAggregatorGroupByTest
|
|||
|
||||
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
|
||||
this.getClass().getResourceAsStream("/" + "bd_test_data.csv"),
|
||||
Resources.asCharSource(this.getClass().getResource(
|
||||
Resources.asCharSource(
|
||||
this.getClass().getResource(
|
||||
"/" + "bd_test_data_parser.json"),
|
||||
StandardCharsets.UTF_8
|
||||
).read(),
|
||||
|
@ -137,14 +134,30 @@ public class CompressedBigDecimalAggregatorGroupByTest
|
|||
);
|
||||
|
||||
List<ResultRow> results = seq.toList();
|
||||
assertThat(results, hasSize(1));
|
||||
Assert.assertThat(results, IsCollectionWithSize.hasSize(1));
|
||||
ResultRow row = results.get(0);
|
||||
ObjectMapper mapper = helper.getObjectMapper();
|
||||
GroupByQuery groupByQuery = mapper.readValue(groupByQueryJson, GroupByQuery.class);
|
||||
MapBasedRow mapBasedRow = row.toMapBasedRow(groupByQuery);
|
||||
Map<String, Object> event = mapBasedRow.getEvent();
|
||||
assertEquals(new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))), mapBasedRow.getTimestamp());
|
||||
assertThat(event, aMapWithSize(1));
|
||||
assertThat(event, hasEntry("revenue", new BigDecimal("15000000010.000000005")));
|
||||
Assert.assertEquals(
|
||||
new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))),
|
||||
mapBasedRow.getTimestamp()
|
||||
);
|
||||
Assert.assertThat(event, IsMapWithSize.aMapWithSize(3));
|
||||
Assert.assertThat(
|
||||
event,
|
||||
IsMapContaining.hasEntry("cbdRevenueFromString", new BigDecimal("15000000010.000000005"))
|
||||
);
|
||||
// long conversion of 5000000000.000000005 results in null/0 value
|
||||
Assert.assertThat(
|
||||
event,
|
||||
IsMapContaining.hasEntry("cbdRevenueFromLong", new BigDecimal("10000000010.000000000"))
|
||||
);
|
||||
// double input changes 5000000000.000000005 to 5000000000.5 to fit in double mantissa space
|
||||
Assert.assertThat(
|
||||
event,
|
||||
IsMapContaining.hasEntry("cbdRevenueFromDouble", new BigDecimal("15000000010.500000000"))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
[
|
||||
{
|
||||
"type": "compressedBigDecimal",
|
||||
"name": "revenue",
|
||||
"name": "bigDecimalRevenue",
|
||||
"fieldName": "revenue",
|
||||
"scale": 9,
|
||||
"size": 3
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
20170101,mail,0.0
|
||||
20170101,sports,10.000000000
|
||||
20170101,mail,-1.000000000
|
||||
20170101,news,9999999999.000000000
|
||||
20170101,sports,5000000000.000000005
|
||||
20170101,mail,2.0
|
||||
20170101,mail,0.0,0.0,0.0
|
||||
20170101,sports,10.000000000,10.000000000,10.000000000
|
||||
20170101,mail,-1.000000000,-1.000000000,-1.000000000
|
||||
20170101,news,9999999999.000000000,9999999999.000000000,9999999999.000000000
|
||||
20170101,sports,5000000000.000000005,5000000000.000000005,5000000000.5
|
||||
20170101,mail,2.0,2.0,2.0
|
||||
|
|
|
|
@ -8,13 +8,24 @@
|
|||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"property"
|
||||
"property",
|
||||
"revenue",
|
||||
{
|
||||
"type": "long",
|
||||
"name": "longRevenue"
|
||||
},
|
||||
{
|
||||
"type": "double",
|
||||
"name": "doubleRevenue"
|
||||
}
|
||||
]
|
||||
},
|
||||
"columns": [
|
||||
"timestamp",
|
||||
"property",
|
||||
"revenue"
|
||||
"revenue",
|
||||
"longRevenue",
|
||||
"doubleRevenue"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,10 +7,24 @@
|
|||
"aggregations": [
|
||||
{
|
||||
"type": "compressedBigDecimal",
|
||||
"name": "revenue",
|
||||
"name": "cbdRevenueFromString",
|
||||
"fieldName": "revenue",
|
||||
"scale": 9,
|
||||
"size": 3
|
||||
},
|
||||
{
|
||||
"type": "compressedBigDecimal",
|
||||
"name": "cbdRevenueFromLong",
|
||||
"fieldName": "longRevenue",
|
||||
"scale": 9,
|
||||
"size": 3
|
||||
},
|
||||
{
|
||||
"type": "compressedBigDecimal",
|
||||
"name": "cbdRevenueFromDouble",
|
||||
"fieldName": "doubleRevenue",
|
||||
"scale": 9,
|
||||
"size": 3
|
||||
}
|
||||
],
|
||||
"intervals": [
|
||||
|
|
Loading…
Reference in New Issue