compressed big decimal - module (#10705)

Compressed Big Decimal is an extension which provides support for 
Mutable big decimal value that can be used to accumulate values 
without losing precision or reallocating memory. This type helps in 
absolute precision arithmetic on large numbers in applications, 
where greater level of accuracy is required, such as financial 
applications, currency based transactions. This helps avoid rounding 
issues where in potentially large amount of money can be lost.

Accumulation requires that the two numbers have the same scale, 
but does not require that they are of the same size. If the value 
being accumulated has a larger underlying array than this value 
(the result), then the higher order bits are dropped, similar to what 
happens when adding a long to an int and storing the result in an 
int. A compressed big decimal that holds its data with an embedded 
array.

Compressed big decimal is an absolute number based complex type 
based on big decimal in Java. This supports all the functionalities 
supported by Java Big Decimal. Java Big Decimal is not mutable in 
order to avoid big garbage collection issues. Compressed big decimal 
is needed to mutate the value in the accumulator.
This commit is contained in:
senthilkv 2022-09-06 03:06:57 -04:00 committed by GitHub
parent 7d332c6f6a
commit 3d9aef225d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 3561 additions and 0 deletions

View File

@ -0,0 +1,241 @@
---
id: compressed-big-decimal
title: "Compressed Big Decimal"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
## Overview
**Compressed Big Decimal** is an extension which provides support for Mutable big decimal value that can be used to accumulate values without losing precision or reallocating memory. This type helps in absolute precision arithmetic on large numbers in applications, where greater level of accuracy is required, such as financial applications, currency based transactions. This helps avoid rounding issues where in potentially large amount of money can be lost.
Accumulation requires that the two numbers have the same scale, but does not require that they are of the same size. If the value being accumulated has a larger underlying array than this value (the result), then the higher order bits are dropped, similar to what happens when adding a long to an int and storing the result in an int. A compressed big decimal that holds its data with an embedded array.
Compressed big decimal is an absolute number based complex type based on big decimal in Java. This supports all the functionalities supported by Java Big Decimal. Java Big Decimal is not mutable in order to avoid big garbage collection issues. Compressed big decimal is needed to mutate the value in the accumulator.
#### Main enhancements provided by this extension:
1. Functionality: Mutating Big decimal type with greater precision
2. Accuracy: Provides greater level of accuracy in decimal arithmetic
## Operations
To use this extension, make sure to [load](../../development/extensions.md#loading-extensions) `compressed-big-decimal` to your config file.
## Configuration
There are currently no configuration properties specific to Compressed Big Decimal
## Limitations
* Compressed Big Decimal does not provide correct result when the value being accumulated has a larger underlying array than this value (the result), then the higher order bits are dropped, similar to what happens when adding a long to an int and storing the result in an int.
### Ingestion Spec:
* Most properties in the Ingest spec derived from [Ingestion Spec](../../ingestion/index.md) / [Data Formats](../../ingestion/data-formats.md)
|property|description|required?|
|--------|-----------|---------|
|metricsSpec|Metrics Specification, In metrics specification while specifying metrics details such as name, type should be specified as compressedBigDecimal|Yes|
### Query spec:
* Most properties in the query spec derived from [groupBy query](../../querying/groupbyquery.md) / [timeseries](../../querying/timeseriesquery.md), see documentation for these query types.
|property|description|required?|
|--------|-----------|---------|
|queryType|This String should always be either "groupBy" OR "timeseries"; this is the first thing Druid looks at to figure out how to interpret the query.|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../../querying/datasource.md) for more information.|yes|
|dimensions|A JSON list of [DimensionSpec](../../querying/dimensionspecs.md) (Notice that property is optional)|no|
|limitSpec|See [LimitSpec](../../querying/limitspec.md)|no|
|having|See [Having](../../querying/having.md)|no|
|granularity|A period granularity; See [Period Granularities](../../querying/granularities.html#period-granularities)|yes|
|filter|See [Filters](../../querying/filters.md)|no|
|aggregations|Aggregations forms the input to Averagers; See [Aggregations](../../querying/aggregations.md). The Aggregations must specify type, scale and size as follows for compressedBigDecimal Type ```"aggregations": [{"type": "compressedBigDecimal","name": "..","fieldName": "..","scale": [Numeric],"size": [Numeric]}```. Please refer query example in Examples section. |Yes|
|postAggregations|Supports only aggregations as input; See [Post Aggregations](../../querying/post-aggregations.md)|no|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|context|An additional JSON Object which can be used to specify certain flags.|no|
## Examples
Consider the data as
|Date|Item|SaleAmount|
|--------|-----------|---------|
```
20201208,ItemA,0.0
20201208,ItemB,10.000000000
20201208,ItemA,-1.000000000
20201208,ItemC,9999999999.000000000
20201208,ItemB,5000000000.000000005
20201208,ItemA,2.0
20201208,ItemD,0.0
```
IngestionSpec syntax:
```json
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "invoices",
"timestampSpec": {
"column": "timestamp",
"format": "yyyyMMdd"
},
"dimensionsSpec": {
"dimensions": [{
"type": "string",
"name": "itemName"
}]
},
"metricsSpec": [{
"name": "saleAmount",
"type": *"compressedBigDecimal"*,
"fieldName": "saleAmount"
}],
"transformSpec": {
"filter": null,
"transforms": []
},
"granularitySpec": {
"type": "uniform",
"rollup": false,
"segmentGranularity": "DAY",
"queryGranularity": "none",
"intervals": ["2020-12-08/2020-12-09"]
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "/home/user/sales/data/staging/invoice-data",
"filter": "invoice-001.20201208.txt"
},
"inputFormat": {
"type": "tsv",
"delimiter": ",",
"skipHeaderRows": 0,
"columns": [
"timestamp",
"itemName",
"saleAmount"
]
}
},
"tuningConfig": {
"type": "index_parallel"
}
}
}
```
### Group By Query example
Calculating sales groupBy all.
Query syntax:
```json
{
"queryType": "groupBy",
"dataSource": "invoices",
"granularity": "ALL",
"dimensions": [
],
"aggregations": [
{
"type": "compressedBigDecimal",
"name": "saleAmount",
"fieldName": "saleAmount",
"scale": 9,
"size": 3
}
],
"intervals": [
"2020-01-08T00:00:00.000Z/P1D"
]
}
```
Result:
```json
[ {
"version" : "v1",
"timestamp" : "2020-12-08T00:00:00.000Z",
"event" : {
"revenue" : 15000000010.000000005
}
} ]
```
Had you used *doubleSum* instead of compressedBigDecimal the result would be
```json
[ {
"timestamp" : "2020-12-08T00:00:00.000Z",
"result" : {
"revenue" : 1.500000001E10
}
} ]
```
As shown above the precision is lost and could lead to loss in money.
### TimeSeries Query Example
Query syntax:
```json
{
"queryType": "timeseries",
"dataSource": "invoices",
"granularity": "ALL",
"aggregations": [
{
"type": "compressedBigDecimal",
"name": "revenue",
"fieldName": "revenue",
"scale": 9,
"size": 3
}
],
"filter": {
"type": "not",
"field": {
"type": "selector",
"dimension": "itemName",
"value": "ItemD"
}
},
"intervals": [
"2020-12-08T00:00:00.000Z/P1D"
]
}
```
Result:
```json
[ {
"timestamp" : "2020-12-08T00:00:00.000Z",
"result" : {
"revenue" : 15000000010.000000005
}
} ]
```

View File

@ -79,6 +79,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati
|ambari-metrics-emitter|Ambari Metrics Emitter |[link](../development/extensions-contrib/ambari-metrics-emitter.md)|
|druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.md)|
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.md)|
|druid-compressed-bigdecimal|Compressed Big Decimal Type | [link](../development/extensions-contrib/compressed-big-decimal.md)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.md)|
|druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.md)|
|druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.md)|

View File

@ -0,0 +1,130 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.druid</groupId>
<artifactId>druid</artifactId>
<version>25.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.druid.extensions.contrib</groupId>
<artifactId>druid-compressed-bigdecimal</artifactId>
<name>druid-compressed-bigdecimal</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<classifier>tests</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>java-hamcrest</artifactId>
<version>2.0.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
<version>4.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.10.2</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import java.math.BigDecimal;
import java.math.BigInteger;
/**
* A compressed big decimal that holds its data with an embedded array.
*/
@SuppressWarnings("serial")
public class ArrayCompressedBigDecimal extends CompressedBigDecimal<ArrayCompressedBigDecimal>
{
private static final int BYTE_MASK = 0xff;
private final int[] array;
/**
* Construct an AccumulatingBigDecimal using the referenced initial
* value and scale.
*
* @param initialVal initial value
* @param scale the scale to use
*/
public ArrayCompressedBigDecimal(long initialVal, int scale)
{
super(scale);
this.array = new int[2];
this.array[0] = (int) initialVal;
this.array[1] = (int) (initialVal >>> 32);
}
/**
* Construct an CompressedBigDecimal from an equivalent {@link BigDecimal}.
* The passed value's unscaled number is converted into byte array and then it
* compresses the unscaled number to array
*
* @param initialVal The BigDecimal value.
*/
public ArrayCompressedBigDecimal(BigDecimal initialVal)
{
super(initialVal.scale());
BigInteger unscaled = initialVal.unscaledValue();
byte[] bytes = unscaled.toByteArray();
int arrayLen = (bytes.length + 3) / 4;
this.array = new int[arrayLen];
if (initialVal.signum() == 0) {
// initial value is 0. Nothing to copy
return;
}
int bytesIdx = bytes.length;
for (int ii = 0; ii < arrayLen; ++ii) {
this.array[ii] =
(BYTE_MASK & bytes[--bytesIdx]) |
(bytesIdx != 0 ? (BYTE_MASK & bytes[--bytesIdx]) : (((int) bytes[0]) >> 8)) << 8 |
(bytesIdx != 0 ? (BYTE_MASK & bytes[--bytesIdx]) : (((int) bytes[0]) >> 8)) << 16 |
(bytesIdx != 0 ? (BYTE_MASK & bytes[--bytesIdx]) : (((int) bytes[0]) >> 8)) << 24;
}
}
/**
* Construct an CompressedBigDecimal that is a copy of the passed in value.
*
* @param initVal the initial value
*/
public ArrayCompressedBigDecimal(CompressedBigDecimal<?> initVal)
{
super(initVal.getScale());
this.array = new int[initVal.getArraySize()];
for (int ii = 0; ii < initVal.getArraySize(); ++ii) {
this.array[ii] = initVal.getArrayEntry(ii);
}
}
/**
* Private constructor to build an CompressedBigDecimal that wraps
* a passed in array of ints. The constructor takes ownership of the
* array and its contents may be modified.
*
* @param array the initial magnitude
* @param scale the scale
*/
private ArrayCompressedBigDecimal(int[] array, int scale)
{
super(scale);
this.array = array;
}
/**
* Static method to construct an CompressedBigDecimal that wraps an
* underlying array.
*
* @param array The array to wrap
* @param scale The scale to use
* @return An CompressedBigDecimal
*/
public static ArrayCompressedBigDecimal wrap(int[] array, int scale)
{
return new ArrayCompressedBigDecimal(array, scale);
}
/**
* Allocate a new CompressedBigDecimal with the specified size and scale.
*
* @param size size of the int array used for calculations
* @param scale scale of the number
* @return CompressedBigDecimal
*/
public static ArrayCompressedBigDecimal allocate(int size, int scale)
{
int[] arr = new int[size];
return new ArrayCompressedBigDecimal(arr, scale);
}
/* (non-Javadoc)
* @see org.apache.druid.compressedbigdecimal.CompressedBigDecimal#getArraySize()
*/
@Override
public int getArraySize()
{
return array.length;
}
/**
* Package private access to internal array.
*
* @return the array
*/
int[] getArray()
{
return array;
}
/**
* Package private access to entry in internal array.
*
* @param idx index to retrieve
* @return the entry
*/
@Override
protected int getArrayEntry(int idx)
{
return array[idx];
}
/**
* Package private access to set entry in internal array.
*
* @param idx index to retrieve
* @param val value to set
*/
@Override
protected void setArrayEntry(int idx, int val)
{
array[idx] = val;
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import java.nio.ByteBuffer;
/**
* A compressed big decimal that holds its data with an embedded array.
*/
@SuppressWarnings("serial")
public class ByteBufferCompressedBigDecimal extends CompressedBigDecimal<ByteBufferCompressedBigDecimal>
{
private final ByteBuffer buf;
private final int position;
private final int size;
/**
* Construct an AccumulatingBigDecimal using the referenced initial
* value and scale.
*
* @param buf The byte buffer to wrap
* @param position the position in the byte buffer where the data should be stored
* @param size the size (in ints) of the byte buffer
* @param scale the scale to use
*/
public ByteBufferCompressedBigDecimal(ByteBuffer buf, int position, int size, int scale)
{
super(scale);
this.buf = buf;
this.position = position;
this.size = size;
}
/**
* Construct a CompressedBigDecimal that uses a ByteBuffer for its storage and whose
* initial value is copied from the specified CompressedBigDecimal.
*
* @param buf the ByteBuffer to use for storage
* @param position the position in the ByteBuffer
* @param val initial value
*/
public ByteBufferCompressedBigDecimal(ByteBuffer buf, int position, CompressedBigDecimal<?> val)
{
super(val.getScale());
this.buf = buf;
this.position = position;
this.size = val.getArraySize();
copyToBuffer(buf, position, size, val);
}
/* (non-Javadoc)
* @see org.apache.druid.compressedbigdecimal.CompressedBigDecimal#getArraySize()
*/
@Override
public int getArraySize()
{
return size;
}
/**
* Package private access to entry in internal array.
*
* @param idx index to retrieve
* @return the entry
*/
@Override
protected int getArrayEntry(int idx)
{
return buf.getInt(position + idx * Integer.BYTES);
}
/**
* Package private access to set entry in internal array.
*
* @param idx index to retrieve
* @param val value to set
*/
@Override
protected void setArrayEntry(int idx, int val)
{
buf.putInt(position + idx * Integer.BYTES, val);
}
/**
* Copy a compressed big decimal into a Bytebuffer in a format understood by this class.
*
* @param buf The buffer
* @param position The position in the buffer to place the value
* @param size The space (in number of ints) allocated for the value
* @param val THe value to copy
*/
public static void copyToBuffer(ByteBuffer buf, int position, int size, CompressedBigDecimal<?> val)
{
if (val.getArraySize() > size) {
throw new IllegalArgumentException("Right hand side too big to fit in the result value");
}
for (int ii = 0; ii < size; ++ii) {
buf.putInt(position + ii * Integer.BYTES, val.getArrayEntry(ii));
}
}
}

View File

@ -0,0 +1,329 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.function.ToIntBiFunction;
/**
* Mutable big decimal value that can be used to accumulate values without losing precision or reallocating memory.
* This helps in revenue based calculations
*
* @param <T> Type of actual derived class that contains the underlying data
*/
@SuppressWarnings("serial")
public abstract class CompressedBigDecimal<T extends CompressedBigDecimal<T>> extends Number
implements Comparable<CompressedBigDecimal<T>>
{
private static final long INT_MASK = 0x00000000ffffffffL;
private final int scale;
/**
* Construct an CompressedBigDecimal by specifying the scale, array size, and a function to get
* and put array values.
*
* @param scale scale of the compressed big decimal
*/
protected CompressedBigDecimal(int scale)
{
this.scale = scale;
}
/**
* Accumulate (add) the passed in value into the current total. This
* modifies the value of the current object. Accumulation requires that
* the two numbers have the same scale, but does not require that they are
* of the same size. If the value being accumulated has a larger underlying array
* than this value (the result), then the higher order bits are dropped, similar to
* what happens when adding a long to an int and storing the result in an int.
*
* @param <S> type of compressedbigdecimal to accumulate
* @param rhs The object to accumulate
* @return a reference to <b>this</b>
*/
public <S extends CompressedBigDecimal<S>> CompressedBigDecimal<T> accumulate(CompressedBigDecimal<S> rhs)
{
if (rhs.scale != scale) {
throw new IllegalArgumentException("Cannot accumulate MutableBigDecimals with differing scales");
}
if (rhs.getArraySize() > getArraySize()) {
throw new IllegalArgumentException("Right hand side too big to fit in the result value");
}
internalAdd(getArraySize(), this, CompressedBigDecimal::getArrayEntry, CompressedBigDecimal::setArrayEntry,
rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry);
return this;
}
/**
* Clear any accumulated value, resetting to zero. Scale is preserved at its original value.
*
* @return this
*/
public CompressedBigDecimal<T> reset()
{
for (int ii = 0; ii < getArraySize(); ++ii) {
setArrayEntry(ii, 0);
}
return this;
}
/**
* Accumulate values into this mutable value. Values to be accumulated
* must have the same scale as the result. The incoming value may have
* been more precision than the result. If so, the upper bits are truncated,
* similar to what happens when a long is assigned to an int.
*
* @param <R> type of the object containing the lhs array
* @param <S> type of obejct containing the rhs array
* @param llen the underlying left array size
* @param lhs the object containing the left array data
* @param lhsGet method reference to get an underlying left value
* @param lhsSet method reference to set an underlying left value
* @param rlen the underlying right array size
* @param rhs the object containing the right array data
* @param rhsGet method reference to get an underlying right value
*/
static <R, S> void internalAdd(int llen, R lhs, ToIntBiFunction<R, Integer> lhsGet, ObjBiIntConsumer<R> lhsSet,
int rlen, S rhs, ToIntBiFunction<S, Integer> rhsGet)
{
int commonLen = Integer.min(llen, rlen);
long carry = 0;
long sum;
// copy the common part
for (int ii = 0; ii < commonLen; ++ii) {
sum = (INT_MASK & lhsGet.applyAsInt(lhs, ii)) + (INT_MASK & rhsGet.applyAsInt(rhs, ii)) + carry;
lhsSet.accept(lhs, ii, (int) sum);
carry = sum >>> 32;
}
long signExtension = signumInternal(rlen, rhs, rhsGet) < 0 ? INT_MASK : 0;
// for the remaining portion of the lhs that didn't have matching
// rhs values, just propagate any necessary carry and sign extension
for (int ii = commonLen; ii < llen && (carry != 0 || signExtension != 0); ++ii) {
sum = (INT_MASK & lhsGet.applyAsInt(lhs, ii)) + signExtension + carry;
lhsSet.accept(lhs, ii, (int) sum);
carry = sum >>> 32;
}
// don't do anything with remaining rhs. That value is lost due to overflow.
}
/**
* Get a byte array representing the magnitude of this value,
* formatted for use by {@link BigInteger#BigInteger(byte[])}.
*
* @return the byte array for use in BigInteger
*/
private byte[] toByteArray()
{
int byteArrayLength = getArraySize() * 4;
byte[] bytes = new byte[byteArrayLength];
int byteIdx = 0;
for (int ii = getArraySize(); ii > 0; --ii) {
int val = getArrayEntry(ii - 1);
bytes[byteIdx + 3] = (byte) val;
val >>>= 8;
bytes[byteIdx + 2] = (byte) val;
val >>>= 8;
bytes[byteIdx + 1] = (byte) val;
val >>>= 8;
bytes[byteIdx] = (byte) val;
byteIdx += 4;
}
int leadingZeros = Integer.numberOfLeadingZeros(getArrayEntry(getArraySize() - 1));
int emptyBytes = leadingZeros / 8;
if (emptyBytes != 0) {
if (emptyBytes == byteArrayLength || leadingZeros % 8 == 0) {
// don't get rid of all the leading zeros if it is the complete number (array size must
// be a minimum of 1) or if trimming the byte might change the sign of the value (first
// one is on a byte boundary).
emptyBytes--;
}
return Arrays.copyOfRange(bytes, emptyBytes, byteArrayLength);
}
return bytes;
}
/**
* Return the scale of the value.
*
* @return the scale
*/
public int getScale()
{
return scale;
}
/**
* Return the array size.
*
* @return the array size
*/
protected abstract int getArraySize();
/**
* Get value from the array.
*
* @param idx the index
* @return value from the array at that index
*/
protected abstract int getArrayEntry(int idx);
/**
* Set value in the array.
*
* @param idx the index
* @param val the value
*/
protected abstract void setArrayEntry(int idx, int val);
/**
* Create a {@link BigDecimal} with the equivalent value to this
* instance.
*
* @return the BigDecimal value
*/
public BigDecimal toBigDecimal()
{
BigInteger bigInt = new BigInteger(toByteArray());
return new BigDecimal(bigInt, scale);
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString()
{
return toBigDecimal().toString();
}
/**
* Returns the signum function of this {@code BigDecimal}.
*
* @return -1, 0, or 1 as the value of this {@code BigDecimal} is negative, zero, or positive.
*/
public int signum()
{
return signumInternal(getArraySize(), this, CompressedBigDecimal::getArrayEntry);
}
/**
* Internal implementation if signum.
* For the Provided Compressed big decimal value it checks and returns
* -1 if Negative
* 0 if Zero
* 1 if Positive
* @param <S> type of object containing the array
* @param size the underlying array size
* @param rhs object that contains the underlying array
* @param valFunc method reference to get an underlying value
* @return -1, 0, or 1 as the value of this {@code BigDecimal} is negative, zero, or positive.
*/
protected static <S> int signumInternal(int size, S rhs, ToIntBiFunction<S, Integer> valFunc)
{
if (valFunc.applyAsInt(rhs, size - 1) < 0) {
return -1;
} else {
int agg = 0;
for (int ii = 0; ii < size; ++ii) {
agg |= valFunc.applyAsInt(rhs, ii);
}
if (agg == 0) {
return 0;
} else {
return 1;
}
}
}
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(CompressedBigDecimal<T> o)
{
if (this.equals(o)) {
return 0;
}
return this.toBigDecimal().compareTo(o.toBigDecimal());
}
/**
* Returns the value of the specified number as an {@code int},
* which may involve rounding or truncation.
*
* @return the numeric value represented by this object after conversion
* to type {@code int}.
*/
@Override
public int intValue()
{
return toBigDecimal().setScale(0, BigDecimal.ROUND_HALF_UP).intValue();
}
/**
* Returns the value of the specified number as a {@code long},
* which may involve rounding or truncation.
*
* @return the numeric value represented by this object after conversion
* to type {@code long}.
*/
@Override
public long longValue()
{
return toBigDecimal().setScale(0, BigDecimal.ROUND_HALF_UP).longValue();
}
/**
* Returns the value of the specified number as a {@code float},
* which may involve rounding.
*
* @return the numeric value represented by this object after conversion
* to type {@code float}.
*/
@Override
public float floatValue()
{
return toBigDecimal().floatValue();
}
/**
* Returns the value of the specified number as a {@code double},
* which may involve rounding.
*
* @return the numeric value represented by this object after conversion
* to type {@code double}.
*/
@Override
public double doubleValue()
{
return toBigDecimal().doubleValue();
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.segment.ColumnValueSelector;
import javax.annotation.Nullable;
/**
* AggregateCombiner for CompressedBigDecimals.
*/
public class CompressedBigDecimalAggregateCombiner implements AggregateCombiner<CompressedBigDecimal<?>>
{
private CompressedBigDecimal<?> sum;
@Override
public void reset(@SuppressWarnings("rawtypes") ColumnValueSelector columnValueSelector)
{
@SuppressWarnings("unchecked")
ColumnValueSelector<CompressedBigDecimal<?>> selector =
(ColumnValueSelector<CompressedBigDecimal<?>>) columnValueSelector;
CompressedBigDecimal<?> cbd = selector.getObject();
if (sum == null) {
sum = new ArrayCompressedBigDecimal(cbd);
} else {
sum.reset();
sum.accumulate(cbd);
}
}
@Override
public void fold(@SuppressWarnings("rawtypes") ColumnValueSelector columnValueSelector)
{
@SuppressWarnings("unchecked")
ColumnValueSelector<CompressedBigDecimal<?>> selector =
(ColumnValueSelector<CompressedBigDecimal<?>>) columnValueSelector;
CompressedBigDecimal<?> cbd = selector.getObject();
if (sum == null) {
sum = new ArrayCompressedBigDecimal(cbd);
} else {
if (cbd.signum() != 0) {
sum.accumulate(cbd);
}
}
}
@Override
public double getDouble()
{
throw new UnsupportedOperationException("CompressedBigDecimalCombiner does not support getDouble()");
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException("CompressedBigDecimalCombiner does not support getFloat()");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("CompressedBigDecimalCombiner does not support getLong()");
}
@Nullable
@Override
public CompressedBigDecimal<?> getObject()
{
return sum;
}
@SuppressWarnings("unchecked")
@Override
public Class<CompressedBigDecimal<?>> classOfObject()
{
return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.ColumnValueSelector;
/**
* An Aggregator to aggregate big decimal values.
*/
public class CompressedBigDecimalAggregator implements Aggregator
{
private final ColumnValueSelector<CompressedBigDecimal<?>> selector;
private final CompressedBigDecimal<?> sum;
/**
* Constructor.
*
* @param size the size to allocate
* @param scale the scale
* @param selector that has the metric value
*/
public CompressedBigDecimalAggregator(
int size,
int scale,
ColumnValueSelector<CompressedBigDecimal<?>> selector
)
{
this.selector = selector;
this.sum = ArrayCompressedBigDecimal.allocate(size, scale);
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.Aggregator#aggregate()
*/
@Override
public void aggregate()
{
CompressedBigDecimal<?> selectedObject = selector.getObject();
if (selectedObject != null) {
if (selectedObject.getScale() != sum.getScale()) {
selectedObject = Utils.scaleUp(selectedObject);
}
sum.accumulate(selectedObject);
}
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.Aggregator#get()
*/
@Override
public Object get()
{
return sum;
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.Aggregator#getFloat()
*/
@Override
public float getFloat()
{
throw new UnsupportedOperationException("CompressedBigDecimalAggregator does not support getFloat()");
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.Aggregator#getLong()
*/
@Override
public long getLong()
{
throw new UnsupportedOperationException("CompressedBigDecimalAggregator does not support getLong()");
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.Aggregator#close()
*/
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregateCombiner;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
* An aggregator factory to generate longSum aggregator object.
*/
public class CompressedBigDecimalAggregatorFactory
extends NullableNumericAggregatorFactory<ColumnValueSelector<CompressedBigDecimal<?>>>
{
public static final int DEFAULT_SCALE = 9;
public static final int DEFAULT_SIZE = 3;
private static final byte CACHE_TYPE_ID = 0x37;
public static final Comparator<CompressedBigDecimal<?>> COMPARATOR = new Comparator<CompressedBigDecimal<?>>()
{
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public int compare(CompressedBigDecimal lhs, CompressedBigDecimal rhs)
{
return lhs.compareTo(rhs);
}
};
private final String name;
private final String fieldName;
private final int size;
private final int scale;
/**
* Constructor.
*
* @param name metric field name
* @param fieldName fieldName metric field name
* @param size size of the int array used for calculations
* @param scale scale of the number
*/
@JsonCreator
public CompressedBigDecimalAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty(value = "size", required = false) Integer size,
@JsonProperty(value = "scale", required = false) Integer scale
)
{
this.name = name;
this.fieldName = fieldName;
this.size = size == null ? DEFAULT_SIZE : size;
this.scale = scale == null ? DEFAULT_SCALE : scale;
}
@SuppressWarnings("unchecked")
@Override
protected ColumnValueSelector<CompressedBigDecimal<?>> selector(ColumnSelectorFactory metricFactory)
{
return (ColumnValueSelector<CompressedBigDecimal<?>>) metricFactory.makeColumnValueSelector(fieldName);
}
@Override
protected Aggregator factorize(ColumnSelectorFactory metricFactory,
@Nonnull ColumnValueSelector<CompressedBigDecimal<?>> selector)
{
return new CompressedBigDecimalAggregator(size, scale, selector);
}
@Override
protected BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory,
@Nonnull ColumnValueSelector<CompressedBigDecimal<?>> selector)
{
return new CompressedBigDecimalBufferAggregator(size, scale, selector);
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#getComparator()
*/
@Override
public Comparator<CompressedBigDecimal<?>> getComparator()
{
return COMPARATOR;
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#combine(java.lang.Object, java.lang.Object)
*/
@Nullable
@Override
public Object combine(Object lhs, Object rhs)
{
if (lhs == null && rhs == null) {
return ArrayCompressedBigDecimal.allocate(size, scale);
} else if (lhs == null) {
return rhs;
} else if (rhs == null) {
return lhs;
} else {
// Allocate a new result and accumlate both left and right into it.
// This ensures that the result has the correct scale, avoiding possible IllegalArgumentExceptions
// due to truncation when the deserialized objects aren't big enough to hold the accumlated result.
// The most common case this avoids is deserializing 0E-9 into a CompressedBigDecimal with array
// size 1 and then accumulating a larger value into it.
CompressedBigDecimal<?> retVal = ArrayCompressedBigDecimal.allocate(size, scale);
CompressedBigDecimal<?> left = (CompressedBigDecimal<?>) lhs;
CompressedBigDecimal<?> right = (CompressedBigDecimal<?>) rhs;
if (left.signum() != 0) {
retVal.accumulate(left);
}
if (right.signum() != 0) {
retVal.accumulate(right);
}
return retVal;
}
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#getCombiningFactory()
*/
@Override
public AggregatorFactory getCombiningFactory()
{
return new CompressedBigDecimalAggregatorFactory(name, name, size, scale);
}
@Override
public AggregateCombiner<CompressedBigDecimal<?>> makeAggregateCombiner()
{
return new CompressedBigDecimalAggregateCombiner();
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#getRequiredColumns()
*/
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.singletonList(new CompressedBigDecimalAggregatorFactory(
fieldName,
fieldName,
size,
scale
));
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#deserialize(java.lang.Object)
*/
@Nullable
@Override
public Object deserialize(Object object)
{
if (object == null) {
return null;
} else if (object instanceof BigDecimal) {
return new ArrayCompressedBigDecimal((BigDecimal) object);
} else if (object instanceof Double) {
return new ArrayCompressedBigDecimal(new BigDecimal((Double) object));
} else if (object instanceof String) {
return new ArrayCompressedBigDecimal(new BigDecimal((String) object));
} else {
throw new RuntimeException("unknown type in deserialize: " + object.getClass().getSimpleName());
}
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#requiredFields()
*/
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
/* (non-Javadoc) Get Type */
@Override
public ValueType getType()
{
return ValueType.COMPLEX;
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#getTypeName()
*/
@Override
public String getComplexTypeName()
{
return CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL;
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#getCacheKey()
*/
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(CACHE_TYPE_ID).put(fieldNameBytes).array();
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#finalizeComputation(java.lang.Object)
*/
@Override
public Object finalizeComputation(Object object)
{
CompressedBigDecimal<?> compressedBigDecimal = (CompressedBigDecimal<?>) object;
BigDecimal bigDecimal = compressedBigDecimal.toBigDecimal();
return bigDecimal.compareTo(BigDecimal.ZERO) == 0 ? 0 : bigDecimal;
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#getName()
*/
@Override
@JsonProperty
public String getName()
{
return name;
}
/**
* Get the filed name.
*
* @return dimension/metric field name
*/
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public int getScale()
{
return scale;
}
@JsonProperty
public int getSize()
{
return size;
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.AggregatorFactory#getMaxIntermediateSize()
*/
@Override
public int getMaxIntermediateSize()
{
return Integer.BYTES * size;
}
@Override
public String toString()
{
return "CompressedBigDecimalAggregatorFactory{" +
"name='" + getName() + '\'' +
", type='" + getComplexTypeName() + '\'' +
", fieldName='" + getFieldName() + '\'' +
", requiredFields='" + requiredFields() + '\'' +
", size='" + getSize() + '\'' +
", scale='" + getScale() + '\'' +
'}';
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.ColumnValueSelector;
import java.nio.ByteBuffer;
/**
* A buffered aggregator to aggregate big decimal value.
*/
public class CompressedBigDecimalBufferAggregator implements BufferAggregator
{
//Cache will hold the aggregated value.
//We are using ByteBuffer to hold the key to the aggregated value.
private final ColumnValueSelector<CompressedBigDecimal<?>> selector;
private final int size;
private final int scale;
/**
* Constructor.
*
* @param size the size to allocate
* @param scale the scale
* @param selector a ColumnSelector to retrieve incoming values
*/
public CompressedBigDecimalBufferAggregator(
int size,
int scale,
ColumnValueSelector<CompressedBigDecimal<?>> selector
)
{
this.selector = selector;
this.size = size;
this.scale = scale;
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.BufferAggregator#init(java.nio.ByteBuffer, int)
*/
@Override
public void init(ByteBuffer buf, int position)
{
for (int ii = 0; ii < size; ++ii) {
buf.putInt(position + (ii * Integer.BYTES), 0);
}
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.BufferAggregator#aggregate(java.nio.ByteBuffer, int)
*/
@Override
public void aggregate(ByteBuffer buf, int position)
{
CompressedBigDecimal<?> addend = selector.getObject();
if (addend != null) {
Utils.accumulate(buf, position, size, scale, addend);
}
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.BufferAggregator#get(java.nio.ByteBuffer, int)
*/
@Override
public Object get(ByteBuffer buf, int position)
{
return new ByteBufferCompressedBigDecimal(buf, position, size, scale);
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.BufferAggregator#getFloat(java.nio.ByteBuffer, int)
*/
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("CompressedBigDecimalBufferAggregator does not support getFloat()");
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.BufferAggregator#getLong(java.nio.ByteBuffer, int)
*/
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("CompressedBigDecimalBufferAggregator does not support getLong()");
}
/* (non-Javadoc)
* @see org.apache.druid.query.aggregation.BufferAggregator#close()
*/
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.ColumnarInts;
import org.apache.druid.segment.data.ColumnarMultiInts;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.data.ReadableOffset;
import javax.annotation.Nullable;
import java.io.IOException;
/**
* Accumulating Big Decimal column in druid segment.
*/
public class CompressedBigDecimalColumn implements ComplexColumn
{
public static final Logger LOGGER = new Logger(CompressedBigDecimalColumn.class);
private final ColumnarInts scale;
private final ColumnarMultiInts magnitude;
/**
* Constructor.
*
* @param scale scale of the rows
* @param magnitude LongColumn representing magnitudes
*/
public CompressedBigDecimalColumn(ColumnarInts scale, ColumnarMultiInts magnitude)
{
this.scale = scale;
this.magnitude = magnitude;
}
@Override
public Class<CompressedBigDecimalColumn> getClazz()
{
return CompressedBigDecimalColumn.class;
}
@Override
public String getTypeName()
{
return CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL;
}
@Override
public CompressedBigDecimal<?> getRowValue(int rowNum)
{
int s = scale.get(rowNum);
// make a copy of the value from magnitude because the IndexedInts returned
// from druid is mutable and druid reuses the object for future calls.
IndexedInts vals = magnitude.get(rowNum);
int size = vals.size();
int[] array = new int[size];
for (int ii = 0; ii < size; ++ii) {
array[ii] = vals.get(ii);
}
return ArrayCompressedBigDecimal.wrap(array, s);
}
@Override
public int getLength()
{
return scale.size();
}
@Override
public ColumnValueSelector<?> makeColumnValueSelector(final ReadableOffset offset)
{
return new ObjectColumnSelector<CompressedBigDecimal<?>>()
{
@Override @Nullable
public CompressedBigDecimal<?> getObject()
{
return getRowValue(offset.getOffset());
}
@Override @SuppressWarnings("unchecked")
public Class<CompressedBigDecimal<?>> classOfObject()
{
return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", CompressedBigDecimalColumn.this);
}
};
}
@Override
public void close()
{
try {
scale.close();
}
catch (IOException ex) {
LOGGER.error(ex, "failed to clean up scale part of CompressedBigDecimalColumn");
}
try {
magnitude.close();
}
catch (IOException ex) {
LOGGER.error(ex, "failed to clean up magnitude part of CompressedBigDecimalColumn");
}
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSupplier;
import java.nio.ByteBuffer;
/**
* Complex column supplier that understands {@link CompressedBigDecimal} values.
*/
public class CompressedBigDecimalColumnPartSupplier implements Supplier<ComplexColumn>
{
public static final int VERSION = 0x1;
private final CompressedVSizeColumnarIntsSupplier scaleSupplier;
private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier;
/**
* Constructor.
*
* @param scaleSupplier scale supplier
* @param magnitudeSupplier supplied of results
*/
public CompressedBigDecimalColumnPartSupplier(
CompressedVSizeColumnarIntsSupplier scaleSupplier,
V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier
)
{
this.scaleSupplier = scaleSupplier;
this.magnitudeSupplier = magnitudeSupplier;
}
/**
* Compressed.
*
* @param buffer Byte buffer
* @return new instance of CompressedBigDecimalColumnPartSupplier
*/
public static CompressedBigDecimalColumnPartSupplier fromByteBuffer(
ByteBuffer buffer
)
{
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == VERSION) {
CompressedVSizeColumnarIntsSupplier scaleSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
buffer,
IndexIO.BYTE_ORDER);
V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier =
V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER);
return new CompressedBigDecimalColumnPartSupplier(scaleSupplier, magnitudeSupplier);
} else {
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
}
@Override
public ComplexColumn get()
{
return new CompressedBigDecimalColumn(scaleSupplier.get(), magnitudeSupplier.get());
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import java.io.IOException;
/**
* CompressedBigDecimal json serializer.
*/
@SuppressWarnings("rawtypes")
public class CompressedBigDecimalJsonSerializer extends JsonSerializer<CompressedBigDecimal>
{
/* (non-Javadoc)
* @see JsonSerializer#serialize(java.lang.Object, com.fasterxml.jackson.core.JsonGenerator,
* com.fasterxml.jackson.databind.SerializerProvider)
*/
@Override
public void serialize(CompressedBigDecimal value, JsonGenerator jgen, SerializerProvider provider)
throws IOException
{
jgen.writeString(value.toString());
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.data.ArrayBasedIndexedInts;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Locale;
/**
* Column Serializer that understands converting CompressedBigDecimal to 4 byte long values for better storage.
*/
public class CompressedBigDecimalLongColumnSerializer implements GenericColumnSerializer<CompressedBigDecimal<?>>
{
private static final byte VERSION = CompressedBigDecimalColumnPartSupplier.VERSION;
/**
* Static constructor.
*
* @param segmentWriteOutMedium the peon
* @param filenameBase filename of the index
* @return a constructed AccumulatingBigDecimalLongColumnSerializer
*/
public static CompressedBigDecimalLongColumnSerializer create(
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase)
{
return new CompressedBigDecimalLongColumnSerializer(
CompressedVSizeColumnarIntsSerializer.create(
"dummy",
segmentWriteOutMedium,
String.format(Locale.ROOT, "%s.scale", filenameBase),
16,
CompressionStrategy.LZ4),
V3CompressedVSizeColumnarMultiIntsSerializer.create(
"dummy",
segmentWriteOutMedium,
String.format(Locale.ROOT, "%s.magnitude", filenameBase),
Integer.MAX_VALUE,
CompressionStrategy.LZ4));
}
private final CompressedVSizeColumnarIntsSerializer scaleWriter;
private final V3CompressedVSizeColumnarMultiIntsSerializer magnitudeWriter;
/**
* Constructor.
*
* @param scaleWriter the scale writer
* @param magnitudeWriter the magnitude writer
*/
public CompressedBigDecimalLongColumnSerializer(
CompressedVSizeColumnarIntsSerializer scaleWriter,
V3CompressedVSizeColumnarMultiIntsSerializer magnitudeWriter
)
{
this.scaleWriter = scaleWriter;
this.magnitudeWriter = magnitudeWriter;
}
@Override
public void open() throws IOException
{
scaleWriter.open();
magnitudeWriter.open();
}
@Override
public void serialize(ColumnValueSelector<? extends CompressedBigDecimal<?>> obj) throws IOException
{
CompressedBigDecimal<?> abd = obj.getObject();
int[] array = new int[abd.getArraySize()];
for (int ii = 0; ii < abd.getArraySize(); ++ii) {
array[ii] = abd.getArrayEntry(ii);
}
scaleWriter.addValue(abd.getScale());
magnitudeWriter.addValues(new ArrayBasedIndexedInts(array));
}
@Override
public long getSerializedSize() throws IOException
{
return 1 + // version
scaleWriter.getSerializedSize() +
magnitudeWriter.getSerializedSize();
}
@Override
public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[] {VERSION}));
scaleWriter.writeTo(channel, smoosher);
magnitudeWriter.writeTo(channel, smoosher);
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
/**
* ComplexMetricSerde that understands how to read and write scaled longs.
*/
public class CompressedBigDecimalMetricSerde extends ComplexMetricSerde
{
private CompressedBigDecimalObjectStrategy strategy = new CompressedBigDecimalObjectStrategy();
/* (non-Javadoc)
* @see ComplexMetricSerde#getTypeName()
*/
@Override
public String getTypeName()
{
return CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL;
}
@Override
public ComplexMetricExtractor<CompressedBigDecimal<?>> getExtractor()
{
return new ComplexMetricExtractor<CompressedBigDecimal<?>>()
{
@SuppressWarnings("unchecked")
@Override
public Class<CompressedBigDecimal<?>> extractedClass()
{
return (Class<CompressedBigDecimal<?>>) (Class<?>) CompressedBigDecimal.class;
}
@Override
public CompressedBigDecimal<?> extractValue(InputRow inputRow, String metricName)
{
Object rawMetric = inputRow.getRaw(metricName);
if (rawMetric == null) {
return null;
} else if (rawMetric instanceof BigDecimal) {
return new ArrayCompressedBigDecimal((BigDecimal) rawMetric);
} else if (rawMetric instanceof String) {
return new ArrayCompressedBigDecimal(new BigDecimal((String) rawMetric));
} else if (rawMetric instanceof CompressedBigDecimal<?>) {
return (CompressedBigDecimal<?>) rawMetric;
} else {
throw new ISE("Unknown extraction value type: [%s]", rawMetric.getClass().getSimpleName());
}
}
};
}
/* (non-Javadoc)
* @see ComplexMetricSerde#deserializeColumn(java.nio.ByteBuffer, org.apache.druid.segment.column.ColumnBuilder)
*/
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
builder.setComplexColumnSupplier(
CompressedBigDecimalColumnPartSupplier.fromByteBuffer(buffer));
}
/* (non-Javadoc)
* @see org.apache.druid.segment.serde.ComplexMetricSerde#getSerializer(org.apache.druid.segment.data.IOPeon,
* java.lang.String)
*/
@Override
public CompressedBigDecimalLongColumnSerializer getSerializer(
SegmentWriteOutMedium segmentWriteOutMedium,
String column)
{
return CompressedBigDecimalLongColumnSerializer.create(segmentWriteOutMedium, column);
}
/* (non-Javadoc)
* @see ComplexMetricSerde#getObjectStrategy()
*/
@Override
public ObjectStrategy<CompressedBigDecimal<?>> getObjectStrategy()
{
return strategy;
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.segment.serde.ComplexMetrics;
import java.util.List;
/**
* Druid module for BigDecimal complex metrics and aggregator.
*/
public class CompressedBigDecimalModule implements DruidModule
{
public static final String COMPRESSED_BIG_DECIMAL = "compressedBigDecimal";
@Override
public void configure(Binder binder)
{
if (ComplexMetrics.getSerdeForType(COMPRESSED_BIG_DECIMAL) == null) {
ComplexMetrics.registerSerde(COMPRESSED_BIG_DECIMAL, new CompressedBigDecimalMetricSerde());
}
}
@Override
public List<Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule("CompressedBigDecimalModule")
.registerSubtypes(
new NamedType(CompressedBigDecimalAggregatorFactory.class, COMPRESSED_BIG_DECIMAL)
)
.addSerializer(
CompressedBigDecimal.class,
new CompressedBigDecimalJsonSerializer()
)
);
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.segment.data.ObjectStrategy;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
/**
* Defines strategy on how to read and write data from deep storage.
*/
public class CompressedBigDecimalObjectStrategy implements ObjectStrategy<CompressedBigDecimal<?>>
{
/*
* (non-Javadoc)
*
* @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public int compare(CompressedBigDecimal o1, CompressedBigDecimal o2)
{
return o1.compareTo(o2);
}
/*
* (non-Javadoc)
*
* @see org.apache.druid.segment.data.ObjectStrategy#getClazz()
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Class getClazz()
{
return CompressedBigDecimal.class;
}
/*
* (non-Javadoc)
*
* @see org.apache.druid.segment.data.ObjectStrategy#fromByteBuffer(java.nio.ByteBuffer, int)
*/
@Override
public CompressedBigDecimal<?> fromByteBuffer(ByteBuffer buffer, int numBytes)
{
ByteBuffer myBuf = buffer.slice();
myBuf.order(ByteOrder.LITTLE_ENDIAN);
IntBuffer intBuf = myBuf.asIntBuffer();
int scale = intBuf.get();
int[] array = new int[numBytes / 4 - 1];
intBuf.get(array);
return ArrayCompressedBigDecimal.wrap(array, scale);
}
/*
* (non-Javadoc)
*
* @see org.apache.druid.segment.data.ObjectStrategy#toBytes(java.lang.Object)
*/
@Override
public byte[] toBytes(CompressedBigDecimal<?> val)
{
ByteBuffer buf = ByteBuffer.allocate(4 * (val.getArraySize() + 1));
buf.order(ByteOrder.LITTLE_ENDIAN);
IntBuffer intBuf = buf.asIntBuffer();
intBuf.put(val.getScale());
for (int ii = 0; ii < val.getArraySize(); ++ii) {
intBuf.put(val.getArrayEntry(ii));
}
return buf.array();
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
/**
* Represents an operation that accepts an object-valued and two int-valued arguments, and returns no result.
* This is the (reference, int, int) specialization of BiConsumer. Unlike most other functional interfaces,
* ObjBiIntConsumer is expected to operate via side-effects
*
* @param <T> Type of Object for first argument
*/
@FunctionalInterface
public interface ObjBiIntConsumer<T>
{
/**
* Performs this operation on the given arguments.
*
* @param t object arg
* @param val1 first int argument
* @param val2 second int argument
*/
void accept(T t, int val1, int val2);
}

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.segment.data.IndexedInts;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.function.ToIntBiFunction;
/**
* Utility opertaions for accumlation.
*/
public class Utils
{
/**
* Accumulate (add) the passed in value into the current total. This
* modifies the value of the current object. The scale of the BigDecimal is adjusted to match
* the current accumulating scale. If the value being accumulated has a larger underlying array
* than this value (the result), then the higher order bits are dropped, similar to
* what happens when adding a long to an int and storing the result in an int.
*
* @param <S> Type of CompressedBigDecimal into which to accumulate
* @param lhs The object into which to accumulate
* @param rhs The object to accumulate
* @return a reference to <b>this</b>
*/
public static <S extends CompressedBigDecimal<S>>
CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, BigDecimal rhs)
{
CompressedBigDecimal<ArrayCompressedBigDecimal> abd =
new ArrayCompressedBigDecimal(rhs.setScale(lhs.getScale()));
return lhs.accumulate(abd);
}
/**
* Accumulate (add) the passed in value into the current total. This
* modifies the value of the current object. Accumulation requires that
* the two numbers have the same scale, but does not require that they are
* of the same size. If the value being accumulated has a larger underlying array
* than this value (the result), then the higher order bits are dropped, similar to
* what happens when adding a long to an int and storing the result in an int.
*
* @param <S> Type of CompressedBigDecimal into which to accumulate
* @param lhs The object into which to accumulate
* @param rhs The object to accumulate
* @param rhsScale The scale to apply to the long being accumulated
* @return a reference to <b>this</b>
*/
public static <S extends CompressedBigDecimal<S>>
CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, long rhs, int rhsScale)
{
CompressedBigDecimal<ArrayCompressedBigDecimal> abd = new ArrayCompressedBigDecimal(rhs, rhsScale);
return lhs.accumulate(abd);
}
/**
* Accumulate using IndexedInts read from Druid's segment file.
*
* @param <S> Type of CompressedBigDecimal into which to accumulate
* @param lhs The object into which to accumulate
* @param rhs IndexedInts representing array of magnitude values
* @param rhsScale the scale
* @return a reference to <b>this</b>
*/
public static <S extends CompressedBigDecimal<S>>
CompressedBigDecimal<S> accumulate(CompressedBigDecimal<S> lhs, IndexedInts rhs, int rhsScale)
{
if (rhs.size() > lhs.getArraySize()) {
throw new IllegalArgumentException("Right hand side too big to fit in the result value");
}
CompressedBigDecimal.internalAdd(lhs.getArraySize(), lhs, CompressedBigDecimal::getArrayEntry,
CompressedBigDecimal::setArrayEntry, rhs.size(), rhs, IndexedInts::get);
return lhs;
}
/**
* Accumulate using ByteBuffers for Druid BufferAggregator.
*
* @param buf The byte buffer that containes the result to accumlate into
* @param pos The initial position within the buffer
* @param lhsSize The array size of the left
* @param lhsScale The scale of the left
* @param rhs the right side to accumlate
*/
public static void accumulate(ByteBuffer buf, int pos, int lhsSize, int lhsScale, CompressedBigDecimal<?> rhs)
{
if (rhs.getArraySize() > lhsSize) {
throw new IllegalArgumentException("Right hand side too big to fit in the result value");
}
BufferAccessor accessor = BufferAccessor.prepare(pos);
CompressedBigDecimal.internalAdd(lhsSize, buf, accessor, accessor,
rhs.getArraySize(), rhs, CompressedBigDecimal::getArrayEntry);
}
/**
* Returns a {@code CompressedBigDecimal} whose scale is moderated as per the default scale.
*
* @param <S> Type of CompressedBigDecimal to scale
* @param val The value to scale up
* @return Scaled up compressedBigDecimal
*/
public static <S extends CompressedBigDecimal<S>>
CompressedBigDecimal<ArrayCompressedBigDecimal> scaleUp(CompressedBigDecimal<S> val)
{
return new ArrayCompressedBigDecimal(
val.toBigDecimal().setScale(CompressedBigDecimalAggregatorFactory.DEFAULT_SCALE, BigDecimal.ROUND_UP)
);
}
/**
* Helper class that maintains a cache of thread local objects that can be used to access
* a ByteBuffer in {@link Utils#accumulate(ByteBuffer, int, int, int, CompressedBigDecimal)}.
*/
private static class BufferAccessor implements ToIntBiFunction<ByteBuffer, Integer>, ObjBiIntConsumer<ByteBuffer>
{
private static ThreadLocal<BufferAccessor> cache = new ThreadLocal<BufferAccessor>()
{
@Override
protected BufferAccessor initialValue()
{
return new BufferAccessor();
}
};
private int position = 0;
/**
* Initialized the BufferAccessor with the location that should be used for access.
*
* @param position position within the buffer
* @return An initialized BufferAccessor
*/
public static BufferAccessor prepare(int position)
{
BufferAccessor accessor = cache.get();
accessor.position = position;
return accessor;
}
/* (non-Javadoc)
* @see org.apache.druid.compressedbigdecimal.ObjBiIntConsumer#accept(java.lang.Object, int, int)
*/
@Override
public void accept(ByteBuffer buf, int idx, int val)
{
buf.putInt(position + idx * Integer.BYTES, val);
}
/* (non-Javadoc)
* @see java.util.function.ToIntBiFunction#applyAsInt(java.lang.Object, java.lang.Object)
*/
@Override
public int applyAsInt(ByteBuffer buf, Integer idx)
{
return buf.getInt(position + idx * Integer.BYTES);
}
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.druid.compressedbigdecimal.CompressedBigDecimalModule

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.data.ColumnarInts;
import org.apache.druid.segment.data.ColumnarMultiInts;
import org.apache.druid.segment.data.ReadableOffset;
import org.easymock.EasyMock;
import org.junit.Test;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
public class AggregatorCombinerFactoryTest
{
/**
* Test method for {@link CompressedBigDecimalColumn}.
*/
@Test
public void testCompressedBigDecimalColumn()
{
ColumnarMultiInts cmi = EasyMock.createMock(ColumnarMultiInts.class);
ColumnarInts ci = EasyMock.createMock(ColumnarInts.class);
ReadableOffset ro = EasyMock.createMock(ReadableOffset.class);
CompressedBigDecimalColumn cbr = new CompressedBigDecimalColumn(ci, cmi);
assertEquals(CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL, cbr.getTypeName());
assertEquals(0, cbr.getLength());
assertEquals(CompressedBigDecimalColumn.class, cbr.getClazz());
assertNotNull(cbr.makeColumnValueSelector(ro));
}
/**
* Test method for {@link CompressedBigDecimalAggregatorFactory}.
*/
@Test
public void testCompressedBigDecimalAggregatorFactory()
{
CompressedBigDecimalAggregatorFactory cf = new CompressedBigDecimalAggregatorFactory("name", "fieldName", 9, 0);
assertEquals("CompressedBigDecimalAggregatorFactory{name='name', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}", cf.toString());
assertNotNull(cf.getCacheKey());
assertNull(cf.deserialize(null));
assertEquals("5", cf.deserialize(new BigDecimal(5)).toString());
assertEquals("5", cf.deserialize(5d).toString());
assertEquals("5", cf.deserialize("5").toString());
assertEquals("[CompressedBigDecimalAggregatorFactory{name='fieldName', type='compressedBigDecimal', fieldName='fieldName', requiredFields='[fieldName]', size='9', scale='0'}]", Arrays.toString(cf.getRequiredColumns().toArray()));
assertEquals("0", cf.combine(null, null).toString());
assertEquals("4", cf.combine(new BigDecimal(4), null).toString());
assertEquals("4", cf.combine(null, new BigDecimal(4)).toString());
assertEquals("8", cf.combine(new ArrayCompressedBigDecimal(new BigDecimal(4)), new ArrayCompressedBigDecimal(new BigDecimal(4))).toString());
}
/**
* Test method for {@link CompressedBigDecimalAggregatorFactory#deserialize(Object)}.
*/
@Test (expected = RuntimeException.class)
public void testCompressedBigDecimalAggregatorFactoryDeserialize()
{
CompressedBigDecimalAggregatorFactory cf = new CompressedBigDecimalAggregatorFactory("name", "fieldName", 9, 0);
cf.deserialize(5);
}
/**
* Test method for {@link CompressedBigDecimalBufferAggregator#getFloat(ByteBuffer, int)}
*/
@Test (expected = UnsupportedOperationException.class)
public void testCompressedBigDecimalBufferAggregatorGetFloat()
{
ColumnValueSelector cs = EasyMock.createMock(ColumnValueSelector.class);
ByteBuffer bbuf = ByteBuffer.allocate(10);
CompressedBigDecimalBufferAggregator ca = new CompressedBigDecimalBufferAggregator(4, 0, cs);
ca.getFloat(bbuf, 0);
}
/**
* Test method for {@link CompressedBigDecimalBufferAggregator#getLong(ByteBuffer, int)}
*/
@Test (expected = UnsupportedOperationException.class)
public void testCompressedBigDecimalBufferAggregatorGetLong()
{
ColumnValueSelector cs = EasyMock.createMock(ColumnValueSelector.class);
ByteBuffer bbuf = ByteBuffer.allocate(10);
CompressedBigDecimalBufferAggregator ca = new CompressedBigDecimalBufferAggregator(4, 0, cs);
ca.getLong(bbuf, 0);
}
/**
* Test method for {@link CompressedBigDecimalAggregateCombiner#getObject()}
*/
@Test
public void testCompressedBigDecimalAggregateCombinerGetObject()
{
CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
CompressedBigDecimal c = cc.getObject();
assertSame(null, c);
}
/**
* Test method for {@link CompressedBigDecimalAggregateCombiner#getClass()}
*/
@Test
public void testCompressedBigDecimalAggregateCombinerClassofObject()
{
CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
assertSame(CompressedBigDecimalAggregateCombiner.class, cc.getClass());
}
/**
* Test method for {@link CompressedBigDecimalAggregateCombiner#getLong()}
*/
@Test(expected = UnsupportedOperationException.class)
public void testCompressedBigDecimalAggregateCombinerGetLong()
{
CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
cc.getLong();
}
/**
* Test method for {@link CompressedBigDecimalAggregateCombiner#getFloat()}
*/
@Test(expected = UnsupportedOperationException.class)
public void testCompressedBigDecimalAggregateCombinerGetFloat()
{
CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
cc.getFloat();
}
/**
* Test method for {@link CompressedBigDecimalAggregateCombiner#getDouble()}
*/
@Test(expected = UnsupportedOperationException.class)
public void testCompressedBigDecimalAggregateCombinerGetDouble()
{
CompressedBigDecimalAggregateCombiner cc = new CompressedBigDecimalAggregateCombiner();
cc.getDouble();
}
/**
* Test method for {@link CompressedBigDecimalAggregator#getFloat()}
*/
@Test(expected = UnsupportedOperationException.class)
public void testCompressedBigDecimalAggregatorGetFloat()
{
ColumnValueSelector cv = EasyMock.createMock(ColumnValueSelector.class);
CompressedBigDecimalAggregator cc = new CompressedBigDecimalAggregator(2, 0, cv);
cc.getFloat();
}
/**
* Test method for {@link CompressedBigDecimalAggregator#getLong()}
*/
@Test(expected = UnsupportedOperationException.class)
public void testCompressedBigDecimalAggregatorGetLong()
{
ColumnValueSelector cv = EasyMock.createMock(ColumnValueSelector.class);
CompressedBigDecimalAggregator cc = new CompressedBigDecimalAggregator(2, 0, cv);
cc.getLong();
}
}

View File

@ -0,0 +1,465 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import static org.apache.druid.compressedbigdecimal.Utils.accumulate;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
/**
* Unit tests for CompressedBigDecimal.
*/
public class ArrayCompressedBigDecimalTest
{
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal(long, int)}.
*/
@Test
public void testLongConstructorZero()
{
// Validate simple 0 case with longs.
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(0, 0);
d.reset();
assertEquals(0, d.getScale());
int[] array = d.getArray();
assertEquals(2, array.length);
assertEquals(0, array[0]);
assertEquals(0, array[1]);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testLongConstructorPositive()
{
// Validate positive number that doesn't flow into the next int
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(Integer.MAX_VALUE, 9);
ArrayCompressedBigDecimal dl = d;
assertEquals(9, d.getScale());
int[] array = d.getArray();
assertEquals(2, array.length);
assertEquals(Integer.MAX_VALUE, array[0]);
assertEquals(0, array[1]);
assertEquals(0, d.compareTo(new ArrayCompressedBigDecimal(Integer.MAX_VALUE, 9)));
assertEquals(0, d.compareTo(dl));
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testLongConstructorNegative()
{
// validate negative number correctly fills in upper bits.
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(Integer.MIN_VALUE, 5);
assertEquals(5, d.getScale());
int[] array = d.getArray();
assertEquals(2, array.length);
assertEquals(Integer.MIN_VALUE, array[0]);
assertEquals(-1, array[1]);
assertEquals(-21475, d.intValue());
assertEquals(-21475, d.longValue());
assertEquals(-21475, d.shortValue());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testBigDecimalConstructorZero()
{
// simple zero case to test short circuiting
BigDecimal bd = new BigDecimal(0);
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
assertEquals(0, d.getScale());
int[] array = d.getArray();
assertEquals(1, array.length);
assertEquals(0, array[0]);
assertEquals("0", d.toString());
assertEquals(0, d.signum());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testBigDecimalConstructorSmallPositive()
{
// simple one int positive example
BigDecimal bd = new BigDecimal(Integer.MAX_VALUE).scaleByPowerOfTen(-9);
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
assertEquals(9, d.getScale());
int[] array = d.getArray();
assertEquals(1, array.length);
assertEquals(Integer.MAX_VALUE, array[0]);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testBigDecimalConstructorSmallNegative()
{
// simple one int negative example
BigDecimal bd = new BigDecimal(Integer.MIN_VALUE).scaleByPowerOfTen(-5);
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
assertEquals(5, d.getScale());
int[] array = d.getArray();
assertEquals(1, array.length);
assertEquals(Integer.MIN_VALUE, array[0]);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testBigDecimalConstructorLargePositive()
{
// simple two int positive example
BigDecimal bd = new BigDecimal(Long.MAX_VALUE).scaleByPowerOfTen(-9);
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
assertEquals(9, d.getScale());
int[] array = d.getArray();
assertEquals(2, array.length);
assertEquals(-1, array[0]);
assertEquals(Integer.MAX_VALUE, array[1]);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testBigDecimalConstructorLargeNegative()
{
// simple two int negative example
BigDecimal bd = new BigDecimal(Long.MIN_VALUE).scaleByPowerOfTen(-5);
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
assertEquals(5, d.getScale());
int[] array = d.getArray();
assertEquals(2, array.length);
assertEquals(0, array[0]);
assertEquals(Integer.MIN_VALUE, array[1]);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testBigDecimalConstructorUnevenMultiplePositive()
{
// test positive when number of bytes in BigDecimal isn't an even multiple of sizeof(int)
BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1, -1, -1}));
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
assertEquals(0, d.getScale());
int[] array = d.getArray();
assertEquals(2, array.length);
assertEquals(-1, array[0]);
assertEquals(0x7f, array[1]);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal}.
*/
@Test
public void testBigDecimalConstructorUnevenMultipleNegative()
{
// test negative when number of bytes in BigDecimal isn't an even multiple of sizeof(int)
BigDecimal bd = new BigDecimal(new BigInteger(-1, new byte[] {Byte.MIN_VALUE, 0, 0, 0, 0}));
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
assertEquals(0, d.getScale());
int[] array = d.getArray();
assertEquals(2, array.length);
assertEquals(0, array[0]);
assertEquals(Byte.MIN_VALUE, array[1]);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#ArrayCompressedBigDecimal(CompressedBigDecimal)}.
*/
@Test
public void testCopyConstructor()
{
BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1, -1, -1}));
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
ArrayCompressedBigDecimal d2 = new ArrayCompressedBigDecimal(d);
assertEquals(d.getScale(), d2.getScale());
assertArrayEquals(d.getArray(), d2.getArray());
assertNotSame(d.getArray(), d2.getArray());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#wrap(int[], int)}.
*/
@Test
public void testWrap()
{
int[] array = new int[] {Integer.MAX_VALUE, -1};
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(array, 0);
assertSame(array, bd.getArray());
assertEquals(0, bd.getScale());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#allocate(int, int)}.
*/
@Test
public void testAllocate()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.allocate(2, 5);
assertEquals(5, bd.getScale());
assertEquals(2, bd.getArray().length);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
*/
@Test
public void testSimpleAccumulate()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.allocate(2, 0);
ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {0x00000001, 0}, 0);
bd.accumulate(add);
assertArrayEquals(new int[] {1, 0}, bd.getArray());
bd.accumulate(add);
assertArrayEquals(new int[] {2, 0}, bd.getArray());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
*/
@Test
public void testSimpleAccumulateOverflow()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {0x80000000, 0}, 0);
ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {0x7fffffff, 0}, 0);
ArrayCompressedBigDecimal add1 = ArrayCompressedBigDecimal.wrap(new int[] {0x00000001, 0}, 0);
bd.accumulate(add);
assertArrayEquals(new int[] {0xffffffff, 0}, bd.getArray());
bd.accumulate(add1);
assertArrayEquals(new int[] {0, 1}, bd.getArray());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
*/
@Test
public void testSimpleAccumulateUnderflow()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {0, 1}, 0);
ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {-1, -1}, 0);
bd.accumulate(add);
assertArrayEquals(new int[] {0xffffffff, 0}, bd.getArray());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
*/
@Test
public void testUnevenAccumulateUnderflow()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {0, 1}, 0);
ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {-1}, 0);
bd.accumulate(add);
assertArrayEquals(new int[] {0xffffffff, 0}, bd.getArray());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
*/
@Test
public void testUnevenAccumulateOverflow()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {0xffffffff, 1}, 0);
ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {1}, 0);
bd.accumulate(add);
assertArrayEquals(new int[] {0, 2}, bd.getArray());
}
/**
* Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
*/
@Test(expected = IllegalArgumentException.class)
public void testUnevenAccumulateOverflowWithTruncate()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {Integer.MAX_VALUE}, 0);
ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {1, 1}, 0);
bd.accumulate(add);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#accumulate(CompressedBigDecimal)}.
*/
@Test(expected = IllegalArgumentException.class)
public void testAccumulateScaleMismatch()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.allocate(2, 1);
ArrayCompressedBigDecimal add = new ArrayCompressedBigDecimal(1, 0);
bd.accumulate(add);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#toBigDecimal()}.
*/
@Test
public void testToBigDecimal()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.wrap(new int[] {1}, 0);
assertEquals(BigDecimal.ONE, bd.toBigDecimal());
bd = ArrayCompressedBigDecimal.wrap(new int[] {Integer.MAX_VALUE}, 0);
assertEquals(new BigDecimal(Integer.MAX_VALUE), bd.toBigDecimal());
bd = ArrayCompressedBigDecimal.wrap(new int[] {0}, 0);
assertEquals(BigDecimal.ZERO, bd.toBigDecimal());
bd = ArrayCompressedBigDecimal.wrap(new int[] {0, 0}, 0);
assertEquals(BigDecimal.ZERO, bd.toBigDecimal());
bd = new ArrayCompressedBigDecimal(-1, 9);
assertEquals(new BigDecimal(-1).scaleByPowerOfTen(-9), bd.toBigDecimal());
bd = ArrayCompressedBigDecimal.wrap(new int[] {1410065408, 2}, 9);
assertEquals(new BigDecimal(10).setScale(9), bd.toBigDecimal());
}
/**
* Test method for {@link ByteBufferCompressedBigDecimal()}.
*/
@Test
public void testBigDecimalConstructorwithByteBuffer()
{
BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1}));
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
ByteBuffer buf = ByteBuffer.allocate(4);
CompressedBigDecimal cbd = new ByteBufferCompressedBigDecimal(buf, 0, d);
assertEquals(0, cbd.getScale());
assertEquals(8388607, cbd.intValue());
assertEquals(new Long(8388607L).doubleValue(), cbd.floatValue(), 0.001);
assertEquals(new Long(8388607L).doubleValue(), cbd.doubleValue(), 0.001);
}
/**
* Test method for {@link ArrayCompressedBigDecimal#setArrayEntry
*/
@Test
public void testSetArrayEntry()
{
BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1}));
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
d.setArrayEntry(0, 2);
assertEquals(2, d.intValue());
}
/**
* Test method for {@link ByteBufferCompressedBigDecimal#copyToBuffer(ByteBuffer, int, int, CompressedBigDecimal)}
*/
@Test
public void testCopyToBuffer()
{
ByteBuffer bb = ByteBuffer.wrap(new byte[] {0, 0, 0, 0, 0, 0, 0, 4});
ByteBufferCompressedBigDecimal bbdl = new ByteBufferCompressedBigDecimal(bb, 0, 1, 0);
bbdl.setArrayEntry(0, 2);
assertEquals(2, bbdl.intValue());
}
/**
* Test method for {@link Utils#accumulate(ByteBuffer, int, int, int, CompressedBigDecimal)}
*/
@Test(expected = IllegalArgumentException.class)
public void testUtilsAccumulateByteBuf()
{
BigDecimal bd = new BigDecimal(new BigInteger(1, new byte[] {0x7f, -1, -1}));
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
ByteBuffer buf = ByteBuffer.allocate(4);
accumulate(buf, 0, 1, 2, new ArrayCompressedBigDecimal(new BigDecimal(Long.MAX_VALUE)));
}
/**
* Test method for {@link Utils#accumulate(CompressedBigDecimal, long, int)}
*/
@Test(expected = IllegalArgumentException.class)
public void testUtilsAccumulateCbdWithExeception()
{
BigDecimal bd = new BigDecimal(new BigInteger("1"));
ArrayCompressedBigDecimal d = new ArrayCompressedBigDecimal(bd);
accumulate(d, 0L, 1);
}
/**
* Test method for {@link Utils#accumulate(CompressedBigDecimal, long, int)}
*/
@Test
public void testUtilsAccumulateCbd()
{
ArrayCompressedBigDecimal bd = ArrayCompressedBigDecimal.allocate(2, 0);
ArrayCompressedBigDecimal add = ArrayCompressedBigDecimal.wrap(new int[] {0x00000001, 0}, 0);
bd.accumulate(add);
accumulate(bd, 1, 0);
assertEquals("2", bd.toString());
CompressedBigDecimal x = accumulate(bd, new BigDecimal("2"));
assertEquals(4, x.intValue());
CompressedBigDecimalObjectStrategy c1 = new CompressedBigDecimalObjectStrategy();
c1.compare(bd, add);
}
/**
* Test method for {@link CompressedBigDecimalObjectStrategy
*/
@Test
public void testCompressedBigDecimalObjectStrategy()
{
ArrayCompressedBigDecimal bd;
ArrayCompressedBigDecimal acd = ArrayCompressedBigDecimal.wrap(new int[] {0x00000001, 0}, 0);
bd = acd;
CompressedBigDecimalObjectStrategy c1 = new CompressedBigDecimalObjectStrategy();
BigDecimal d = new BigDecimal(new BigInteger(1, new byte[] {0, 0, 1}));
ByteBuffer bb = ByteBuffer.wrap(new byte[] {0, 0, 0, 0, 0, 0, 0, 4});
CompressedBigDecimal cbl = c1.fromByteBuffer(bb, 8);
byte[] bf = c1.toBytes(bd);
ArrayCompressedBigDecimal cbd = new ArrayCompressedBigDecimal(new BigDecimal(new BigInteger(1, bf)));
assertEquals(67108864, cbl.intValue());
assertEquals(0, c1.compare(bd, acd));
assertEquals(0, cbd.intValue());
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
* Unit tests for AccumulatingDecimalAggregator.
*/
@RunWith(Parameterized.class)
public class CompressedBigDecimalAggregatorGroupByTest
{
private final AggregationTestHelper helper;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
/**
* Constructor.
*
* @param config config object
*/
public CompressedBigDecimalAggregatorGroupByTest(GroupByQueryConfig config)
{
CompressedBigDecimalModule module = new CompressedBigDecimalModule();
module.configure(null);
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
module.getJacksonModules(), config, tempFolder);
}
/**
* Constructor feeder.
*
* @return constructors
*/
@Parameterized.Parameters(name = "{0}")
public static Collection<?> constructorFeeder()
{
final List<Object[]> constructors = new ArrayList<>();
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
if ("v2ParallelCombine".equals(config.toString())) {
continue;
}
constructors.add(new Object[] {config});
}
return constructors;
}
/**
* Default setup of UTC timezone.
*/
@BeforeClass
public static void setupClass()
{
System.setProperty("user.timezone", "UTC");
}
/**
* ingetion method for all groupBy query.
*
* @throws IOException IOException
* @throws Exception Exception
*/
@Test
public void testIngestAndGroupByAllQuery() throws IOException, Exception
{
String groupByQueryJson = Resources.asCharSource(
this.getClass().getResource("/" + "bd_test_groupby_query.json"),
StandardCharsets.UTF_8
).read();
Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
this.getClass().getResourceAsStream("/" + "bd_test_data.csv"),
Resources.asCharSource(this.getClass().getResource(
"/" + "bd_test_data_parser.json"),
StandardCharsets.UTF_8
).read(),
Resources.asCharSource(
this.getClass().getResource("/" + "bd_test_aggregators.json"),
StandardCharsets.UTF_8
).read(),
0,
Granularities.NONE,
5,
groupByQueryJson
);
List<ResultRow> results = seq.toList();
assertThat(results, hasSize(1));
ResultRow row = results.get(0);
ObjectMapper mapper = helper.getObjectMapper();
GroupByQuery groupByQuery = mapper.readValue(groupByQueryJson, GroupByQuery.class);
MapBasedRow mapBasedRow = row.toMapBasedRow(groupByQuery);
Map<String, Object> event = mapBasedRow.getEvent();
assertEquals(new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))), mapBasedRow.getTimestamp());
assertThat(event, aMapWithSize(1));
assertThat(event, hasEntry("revenue", new BigDecimal("15000000010.000000005")));
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.compressedbigdecimal;
import com.google.common.collect.Iterables;
import com.google.common.io.Resources;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.TimeZone;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.hamcrest.collection.IsMapWithSize.aMapWithSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
* Unit tests for AccumulatingDecimalAggregator.
*/
public class CompressedBigDecimalAggregatorTimeseriesTest
{
private final AggregationTestHelper helper;
static {
NullHandling.initializeForTests();
}
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
/**
* Constructor.
* * */
public CompressedBigDecimalAggregatorTimeseriesTest()
{
CompressedBigDecimalModule module = new CompressedBigDecimalModule();
module.configure(null);
helper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
module.getJacksonModules(), tempFolder);
}
/**
* Default setup of UTC timezone.
*/
@BeforeClass
public static void setupClass()
{
System.setProperty("user.timezone", "UTC");
}
/**
* ingetion method for all timeseries query.
*
* @throws IOException IOException
* @throws Exception Exception
*/
@Test
public void testIngestAndTimeseriesQuery() throws IOException, Exception
{
Sequence seq = helper.createIndexAndRunQueryOnSegment(
this.getClass().getResourceAsStream("/" + "bd_test_data.csv"),
Resources.asCharSource(getClass().getResource(
"/" + "bd_test_data_parser.json"),
StandardCharsets.UTF_8
).read(),
Resources.asCharSource(
this.getClass().getResource("/" + "bd_test_aggregators.json"),
StandardCharsets.UTF_8
).read(),
0,
Granularities.NONE,
5,
Resources.asCharSource(
this.getClass().getResource("/" + "bd_test_timeseries_query.json"),
StandardCharsets.UTF_8
).read()
);
TimeseriesResultValue result = ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getValue();
Map<String, Object> event = result.getBaseObject();
assertEquals(new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))),
((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getTimestamp());
assertThat(event, aMapWithSize(1));
assertThat(event, hasEntry("revenue", new BigDecimal("15000000010.000000005")));
}
/**
* Test using multiple segments.
*
* @throws Exception an exception
*/
@Test
public void testIngestMultipleSegmentsAndTimeseriesQuery() throws Exception
{
File segmentDir1 = tempFolder.newFolder();
helper.createIndex(
new File(this.getClass().getResource("/" + "bd_test_data.csv").getFile()),
Resources.asCharSource(this.getClass().getResource("/" + "bd_test_data_parser.json"),
StandardCharsets.UTF_8).read(),
Resources.asCharSource(this.getClass().getResource("/" + "bd_test_aggregators.json"),
StandardCharsets.UTF_8).read(),
segmentDir1,
0,
Granularities.NONE,
5);
File segmentDir2 = tempFolder.newFolder();
helper.createIndex(
new File(this.getClass().getResource("/" + "bd_test_zero_data.csv").getFile()),
Resources.asCharSource(this.getClass().getResource("/" + "bd_test_data_parser.json"),
StandardCharsets.UTF_8).read(),
Resources.asCharSource(this.getClass().getResource("/" + "bd_test_aggregators.json"),
StandardCharsets.UTF_8).read(),
segmentDir2,
0,
Granularities.NONE,
5);
Sequence seq = helper.runQueryOnSegments(
Arrays.asList(segmentDir1, segmentDir2),
Resources.asCharSource(
this.getClass().getResource("/" + "bd_test_timeseries_query.json"),
StandardCharsets.UTF_8
).read());
TimeseriesResultValue result = ((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getValue();
Map<String, Object> event = result.getBaseObject();
assertEquals(new DateTime("2017-01-01T00:00:00Z", DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"))),
((Result<TimeseriesResultValue>) Iterables.getOnlyElement(seq.toList())).getTimestamp());
assertThat(event, aMapWithSize(1));
assertThat(event, hasEntry("revenue", new BigDecimal("15000000010.000000005")));
}
}

View File

@ -0,0 +1,9 @@
[
{
"type": "compressedBigDecimal",
"name": "revenue",
"fieldName": "revenue",
"scale": 9,
"size": 3
}
]

View File

@ -0,0 +1,6 @@
20170101,mail,0.0
20170101,sports,10.000000000
20170101,mail,-1.000000000
20170101,news,9999999999.000000000
20170101,sports,5000000000.000000005
20170101,mail,2.0
1 20170101 mail 0.0
2 20170101 sports 10.000000000
3 20170101 mail -1.000000000
4 20170101 news 9999999999.000000000
5 20170101 sports 5000000000.000000005
6 20170101 mail 2.0

View File

@ -0,0 +1,20 @@
{
"type": "string",
"parseSpec": {
"format": "csv",
"timestampSpec": {
"column": "timestamp",
"format": "yyyyMMdd"
},
"dimensionsSpec": {
"dimensions": [
"property"
]
},
"columns": [
"timestamp",
"property",
"revenue"
]
}
}

View File

@ -0,0 +1,19 @@
{
"queryType": "groupBy",
"dataSource": "test_datasource",
"granularity": "ALL",
"dimensions": [
],
"aggregations": [
{
"type": "compressedBigDecimal",
"name": "revenue",
"fieldName": "revenue",
"scale": 9,
"size": 3
}
],
"intervals": [
"2017-01-01T00:00:00.000Z/P1D"
]
}

View File

@ -0,0 +1,25 @@
{
"queryType": "timeseries",
"dataSource": "test_datasource",
"granularity": "ALL",
"aggregations": [
{
"type": "compressedBigDecimal",
"name": "revenue",
"fieldName": "revenue",
"scale": 9,
"size": 3
}
],
"filter": {
"type": "not",
"field": {
"type": "selector",
"dimension": "property",
"value": "XXX"
}
},
"intervals": [
"2017-01-01T00:00:00.000Z/P1D"
]
}

View File

@ -0,0 +1 @@
20170101,XXX,0.0
1 20170101 XXX 0.0

View File

@ -186,6 +186,7 @@
<module>extensions-core/druid-ranger-security</module>
<module>extensions-core/testing-tools</module>
<!-- Community extensions -->
<module>extensions-contrib/compressed-bigdecimal</module>
<module>extensions-contrib/influx-extensions</module>
<module>extensions-contrib/cassandra-storage</module>
<module>extensions-contrib/dropwizard-emitter</module>

View File

@ -1065,6 +1065,7 @@ com.example
common.runtime.properties
druid-aws-rds-extensions
druid-cassandra-storage
druid-compressed-bigdecimal
druid-distinctcount
druid-ec2-extensions
druid-kafka-extraction-namespace
@ -2069,6 +2070,15 @@ CDF
maxStreamLength
toString
100TB
- ../docs/development/extensions-contrib/compressed-big-decimal.md
compressedBigDecimal
limitSpec
metricsSpec
postAggregations
SaleAmount
IngestionSpec
druid-compressed-bigdecimal
doubleSum
- ../docs/querying/sql-functions.md
ANY_VALUE
APPROX_COUNT_DISTINCT_DS_HLL