Support ingestion of long/float dimensions (#3966)

* Support ingestion for long/float dimensions

* Allow non-arrays for key components in indexing type strategy interfaces

* Add numeric index merge test, fixes

* Docs for numeric dims at ingestion

* Remove unused import

* Adjust docs, add aggregate on numeric dims tests

* remove unused imports

* Throw exception for bitmap method on numerics

* Move typed selector creation to DimensionIndexer interface

* unused imports

* Fix

* Remove unused DimensionSpec from indexer methods, check for dims first in inc index storage adapter

* Remove spaces
This commit is contained in:
Jonathan Wei 2017-02-28 19:04:41 -08:00 committed by Fangjin Yang
parent 5ccfdcc48b
commit a08660a9ca
39 changed files with 1775 additions and 147 deletions

View File

@ -165,7 +165,7 @@ public class BitmapIterationBenchmark
/**
* Benchmark of cumulative cost of construction of an immutable bitmap and then iterating over it. This is a pattern
* from realtime nodes, see {@link io.druid.segment.StringDimensionIndexer#fillBitmapsFromUnsortedEncodedArray}.
* from realtime nodes, see {@link io.druid.segment.StringDimensionIndexer#fillBitmapsFromUnsortedEncodedKeyComponent}.
* However this benchmark is yet approximate and to be improved to better reflect actual workloads of realtime nodes.
*/
@Benchmark

View File

@ -36,7 +36,32 @@ An example dataSchema is shown below:
"format" : "auto"
},
"dimensionsSpec" : {
"dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
{
"type": "long",
"name": "countryNum"
},
{
"type": "float",
"name": "userLatitude"
},
{
"type": "float",
"name": "userLongitude"
}
],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
@ -169,10 +194,49 @@ handle all formatting decisions on their own, without using the ParseSpec.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| dimensions | JSON String array | The names of the dimensions. If this is an empty array, Druid will treat all columns that are not timestamp or metric columns as dimension columns. | yes |
| dimensions | JSON array | A list of [dimension schema](#dimension-schema) objects or dimension names. Providing a name is equivalent to providing a String-typed dimension schema with the given name. If this is an empty array, Druid will treat all columns that are not timestamp or metric columns as String-typed dimension columns. | yes |
| dimensionExclusions | JSON String array | The names of dimensions to exclude from ingestion. | no (default == [] |
| spatialDimensions | JSON Object array | An array of [spatial dimensions](../development/geo.html) | no (default == [] |
#### Dimension Schema
A dimension schema specifies the type and name of a dimension to be ingested.
For example, the following `dimensionsSpec` section from a `dataSchema` ingests one column as Long (`countryNum`), two columns as Float (`userLatitude`, `userLongitude`), and the other columns as Strings:
```json
"dimensionsSpec" : {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
{
"type": "long",
"name": "countryNum"
},
{
"type": "float",
"name": "userLatitude"
},
{
"type": "float",
"name": "userLongitude"
}
],
"dimensionExclusions" : [],
"spatialDimensions" : []
}
```
## GranularitySpec
The default granularity spec is `uniform`, and can be changed by setting the `type` field.

View File

@ -12,7 +12,7 @@ of OLAP data.
For more detailed information:
* Every row in Druid must have a timestamp. Data is always partitioned by time, and every query has a time filter. Query results can also be broken down by time buckets like minutes, hours, days, and so on.
* Dimensions are fields that can be filtered on or grouped by. They are always either single Strings or arrays of Strings.
* Dimensions are fields that can be filtered on or grouped by. They are always single Strings, arrays of Strings, single Longs, or single Floats.
* Metrics are fields that can be aggregated. They are often stored as numbers (integers or floats) but can also be stored as complex objects like HyperLogLog sketches or approximate histogram sketches.
Typical production tables (or datasources as they are known in Druid) have fewer than 100 dimensions and fewer
@ -20,6 +20,13 @@ than 100 metrics, although, based on user testimony, datasources with thousands
Below, we outline some best practices with schema design:
## Numeric dimensions
If the user wishes to ingest a column as a numeric-typed dimension (Long or Float), it is necessary to specify the type of the column in the `dimensions` section of the `dimensionsSpec`. If the type is omitted, Druid will ingest a column as the default String type.
See [Dimension Schema](../ingestion/index.html#dimension-schema) for more information.
## High cardinality dimensions (e.g. unique IDs)
In practice, we see that exact counts for unique IDs are often not required. Storing unique IDs as a column will kill
@ -77,6 +84,8 @@ a dimension that has been excluded, or a metric column as a dimension. It should
these segments will be slightly larger than if the list of dimensions was explicitly specified in lexicographic order. This limitation
does not impact query correctness- just storage requirements.
Note that when using schema-less ingestion, all dimensions will be ingested as String-typed dimensions.
## Including the same column as a dimension and a metric
One workflow with unique IDs is to be able to filter on a particular ID, while still being able to do fast unique counts on the ID column.

View File

@ -133,6 +133,8 @@ public class ScanQueryRunnerTest
ScanResultValue.timestampKey,
"market",
"quality",
"qualityLong",
"qualityFloat",
"qualityNumericString",
"placement",
"placementish",
@ -141,9 +143,7 @@ public class ScanQueryRunnerTest
"index",
"indexMin",
"indexMaxPlusTen",
"quality_uniques",
"qualityLong",
"qualityFloat"
"quality_uniques"
);
ScanQuery query = newTestQuery()
.intervals(I_0112_0114)

View File

@ -50,10 +50,15 @@ import java.io.IOException;
*
* The EncodedType and ActualType are Comparable because columns used as dimensions must have sortable values.
*
* @param <EncodedType> class of the encoded values
* @param <ActualType> class of the actual values
* @param <EncodedType> class of a single encoded value
* @param <EncodedKeyComponentType> A row key contains a component for each dimension, this param specifies the
* class of this dimension's key component. A column type that supports multivalue rows
* should use an array type (Strings would use int[]). Column types without multivalue
* row support should use single objects (e.g., Long, Float).
* @param <ActualType> class of a single actual value
*/
public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, EncodedTypeArray, ActualType extends Comparable<ActualType>>
public interface DimensionHandler
<EncodedType extends Comparable<EncodedType>, EncodedKeyComponentType, ActualType extends Comparable<ActualType>>
{
/**
* Get the name of the column associated with this handler.
@ -66,12 +71,12 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
/**
* Creates a new DimensionIndexer, a per-dimension object responsible for processing ingested rows in-memory, used by the
* IncrementalIndex. See {@link DimensionIndexer} interface for more information.
* Creates a new DimensionIndexer, a per-dimension object responsible for processing ingested rows in-memory, used
* by the IncrementalIndex. See {@link DimensionIndexer} interface for more information.
*
* @return A new DimensionIndexer object.
*/
DimensionIndexer<EncodedType, EncodedTypeArray, ActualType> makeIndexer();
DimensionIndexer<EncodedType, EncodedKeyComponentType, ActualType> makeIndexer();
/**
@ -88,7 +93,7 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
* @return A new DimensionMergerV9 object.
*/
DimensionMergerV9<EncodedTypeArray> makeMerger(
DimensionMergerV9<EncodedKeyComponentType> makeMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
@ -98,8 +103,8 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
/**
* Creates a new DimensionMergerLegacy, a per-dimension object responsible for merging indexes/row data across segments
* and building the on-disk representation of a dimension. For use with IndexMerger only.
* Creates a new DimensionMergerLegacy, a per-dimension object responsible for merging indexes/row data across
* segments and building the on-disk representation of a dimension. For use with IndexMerger only.
*
* See {@link DimensionMergerLegacy} interface for more information.
*
@ -111,7 +116,7 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
* @return A new DimensionMergerLegacy object.
*/
DimensionMergerLegacy<EncodedTypeArray> makeLegacyMerger(
DimensionMergerLegacy<EncodedKeyComponentType> makeLegacyMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
@ -120,53 +125,55 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
) throws IOException;
/**
* Given an array representing a single set of row value(s) for this dimension as an Object,
* return the length of the array after appropriate type-casting.
* Given an key component representing a single set of row value(s) for this dimension as an Object,
* return the length of the key component after appropriate type-casting.
*
* For example, a dictionary encoded String dimension would receive an int[] as an Object.
* For example, a dictionary encoded String dimension would receive an int[] as input to this method,
* while a Long numeric dimension would receive a single Long object (no multivalue support)
*
* @param dimVals Array of row values
* @param dimVals Values for this dimension from a row
* @return Size of dimVals
*/
int getLengthFromEncodedArray(EncodedTypeArray dimVals);
int getLengthOfEncodedKeyComponent(EncodedKeyComponentType dimVals);
/**
* Given two arrays representing sorted encoded row value(s), return the result of their comparison.
* Given two key components representing sorted encoded row value(s), return the result of their comparison.
*
* If the two arrays have different lengths, the shorter array should be ordered first in the comparison.
* If the two key components have different lengths, the shorter component should be ordered first in the comparison.
*
* Otherwise, this function should iterate through the array values and return the comparison of the first difference.
* Otherwise, this function should iterate through the key components and return the comparison of the
* first difference.
*
* @param lhs array of row values
* @param rhs array of row values
* For dimensions that do not support multivalue rows, lhs and rhs can be compared directly.
*
* @return integer indicating comparison result of arrays
* @param lhs key component from a row
* @param rhs key component from a row
*
* @return integer indicating comparison result of key components
*/
int compareSortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs);
int compareSortedEncodedKeyComponents(EncodedKeyComponentType lhs, EncodedKeyComponentType rhs);
/**
* Given two arrays representing sorted encoded row value(s), check that the two arrays have the same encoded values,
* or if the encoded values differ, that they translate into the same actual values, using the mappings
* provided by lhsEncodings and rhsEncodings (if applicable).
* Given two key components representing sorted encoded row value(s), check that the two key components
* have the same encoded values, or if the encoded values differ, that they translate into the same actual values,
* using the mappings provided by lhsEncodings and rhsEncodings (if applicable).
*
* If validation fails, this method should throw a SegmentValidationException.
*
* Used by IndexIO for validating segments.
*
* See StringDimensionHandler.validateSortedEncodedArrays() for a reference implementation.
* See StringDimensionHandler.validateSortedEncodedKeyComponents() for a reference implementation.
*
* @param lhs array of row values
* @param rhs array of row values
* @param lhs key component from a row
* @param rhs key component from a row
* @param lhsEncodings encoding lookup from lhs's segment, null if not applicable for this dimension's type
* @param rhsEncodings encoding lookup from rhs's segment, null if not applicable for this dimension's type
*
* @return integer indicating comparison result of arrays
*/
void validateSortedEncodedArrays(
EncodedTypeArray lhs,
EncodedTypeArray rhs,
void validateSortedEncodedKeyComponents(
EncodedKeyComponentType lhs,
EncodedKeyComponentType rhs,
Indexed<ActualType> lhsEncodings,
Indexed<ActualType> rhsEncodings
) throws SegmentValidationException;
@ -186,15 +193,16 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
/**
* Given a subcolumn from getSubColumn, and the index of the current row, retrieve a row as an array of values.
* Given a subcolumn from getSubColumn, and the index of the current row, retrieve a dimension's values
* from a row as an EncodedKeyComponentType.
*
* For example:
* - A String-typed implementation would read the current row from a DictionaryEncodedColumn as an int[].
* - A long-typed implemention would read the current row from a GenericColumn return the current row as a long[].
* - A long-typed implemention would read the current row from a GenericColumn and return a Long.
*
* @param column Column for this dimension from a QueryableIndex
* @param currRow The index of the row to retrieve
* @return The row from "column" specified by "currRow", as an array of values
* @return The key component for this dimension from the current row of the column.
*/
Object getRowValueArrayFromColumn(Closeable column, int currRow);
EncodedKeyComponentType getEncodedKeyComponentFromColumn(Closeable column, int currRow);
}

View File

@ -64,6 +64,14 @@ public final class DimensionHandlerUtils
return new StringDimensionHandler(dimensionName, multiValueHandling);
}
if (capabilities.getType() == ValueType.LONG) {
return new LongDimensionHandler(dimensionName);
}
if (capabilities.getType() == ValueType.FLOAT) {
return new FloatDimensionHandler(dimensionName);
}
// Return a StringDimensionHandler by default (null columns will be treated as String typed)
return new StringDimensionHandler(dimensionName, multiValueHandling);
}

View File

@ -22,6 +22,7 @@ package io.druid.segment;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
@ -68,9 +69,9 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
* - getSortedEncodedValueFromUnsorted()
* - getUnsortedEncodedValueFromSorted()
* - getSortedIndexedValues()
* - convertUnsortedEncodedArrayToSortedEncodedArray()
* - convertUnsortedEncodedKeyComponentToSortedEncodedKeyComponent()
*
* calling processRowValsToUnsortedEncodedArray() afterwards can invalidate previously read sorted encoding values
* calling processRowValsToUnsortedEncodedKeyComponent() afterwards can invalidate previously read sorted encoding values
* (i.e., new values could be added that are inserted between existing values in the ordering).
*
*
@ -78,7 +79,7 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
* --------------------
* Each DimensionIndexer exists within the context of a single IncrementalIndex. Before IndexMerger.persist() is
* called on an IncrementalIndex, any associated DimensionIndexers should allow multiple threads to add data to the
* indexer via processRowValsToUnsortedEncodedArray() and allow multiple threads to read data via methods that only
* indexer via processRowValsToUnsortedEncodedKeyComponent() and allow multiple threads to read data via methods that only
* deal with unsorted encodings.
*
* As mentioned in the "Sorting and Ordering" section, writes and calls to the sorted encoding
@ -97,12 +98,22 @@ import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
* For example, in the RealtimePlumber and IndexGeneratorJob, the thread that performs index persist is started
* by the same thread that handles the row adds on an index, ensuring the adds are visible to the persist thread.
*
* @param <EncodedType> class of the encoded values
* @param <ActualType> class of the actual values
* @param <EncodedType> class of a single encoded value
* @param <EncodedKeyComponentType> A row key contains a component for each dimension, this param specifies the
* class of this dimension's key component. A column type that supports multivalue rows
* should use an array type (e.g., Strings would use int[]). Column types without
* multivalue row support should use single objects (e.g., Long, Float).
* @param <ActualType> class of a single actual value
*
*/
public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, EncodedTypeArray, ActualType extends Comparable<ActualType>>
public interface DimensionIndexer
<EncodedType extends Comparable<EncodedType>, EncodedKeyComponentType, ActualType extends Comparable<ActualType>>
{
/**
* @return The ValueType corresponding to this dimension indexer's ActualType.
*/
ValueType getValueType();
/**
* Given a single row value or list of row values (for multi-valued dimensions), update any internal data structures
* with the ingested values and return the row values as an array to be used within a TimeAndDims key.
@ -117,7 +128,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
*
* @return An array containing an encoded representation of the input row value.
*/
EncodedTypeArray processRowValsToUnsortedEncodedArray(Object dimValues);
EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(Object dimValues);
/**
@ -195,20 +206,55 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
/**
* Return an object used to read rows from a StorageAdapter's Cursor.
*
* e.g. String -> DimensionSelector
* Long -> LongColumnSelector
* Float -> FloatColumnSelector
*
* See StringDimensionIndexer.makeColumnValueSelector() for a reference implementation.
* Return an object used to read values from this indexer's column as Strings.
*
* @param spec Specifies the output name of a dimension and any extraction functions to be applied.
* @param currEntry Provides access to the current TimeAndDims object in the Cursor
* @param desc Descriptor object for this dimension within an IncrementalIndex
* @return A new object that reads rows from currEntry
*/
ColumnValueSelector makeColumnValueSelector(
DimensionSelector makeDimensionSelector(
DimensionSpec spec,
IncrementalIndexStorageAdapter.EntryHolder currEntry,
IncrementalIndex.DimensionDesc desc
);
/**
* Return an object used to read values from this indexer's column as Longs.
*
* @param currEntry Provides access to the current TimeAndDims object in the Cursor
* @param desc Descriptor object for this dimension within an IncrementalIndex
* @return A new object that reads rows from currEntry
*/
LongColumnSelector makeLongColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry,
IncrementalIndex.DimensionDesc desc
);
/**
* Return an object used to read values from this indexer's column as Floats.
*
* @param currEntry Provides access to the current TimeAndDims object in the Cursor
* @param desc Descriptor object for this dimension within an IncrementalIndex
* @return A new object that reads rows from currEntry
*/
FloatColumnSelector makeFloatColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry,
IncrementalIndex.DimensionDesc desc
);
/**
* Return an object used to read values from this indexer's column as Objects.
*
* @param spec Specifies the output name of a dimension and any extraction functions to be applied.
* @param currEntry Provides access to the current TimeAndDims object in the Cursor
* @param desc Descriptor object for this dimension within an IncrementalIndex
* @return A new object that reads rows from currEntry
*/
ObjectColumnSelector makeObjectColumnSelector(
DimensionSpec spec,
IncrementalIndexStorageAdapter.EntryHolder currEntry,
IncrementalIndex.DimensionDesc desc
@ -231,13 +277,13 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* them to their actual type (e.g., performing a dictionary lookup for a dict-encoded String dimension),
* and comparing the actual values until a difference is found.
*
* Refer to StringDimensionIndexer.compareUnsortedEncodedArrays() for a reference implementation.
* Refer to StringDimensionIndexer.compareUnsortedEncodedKeyComponents() for a reference implementation.
*
* @param lhs dimension value array from a TimeAndDims key
* @param rhs dimension value array from a TimeAndDims key
* @return comparison of the two arrays
*/
int compareUnsortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs);
int compareUnsortedEncodedKeyComponents(EncodedKeyComponentType lhs, EncodedKeyComponentType rhs);
/**
@ -247,7 +293,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param rhs dimension value array from a TimeAndDims key
* @return true if the two arrays are equal
*/
boolean checkUnsortedEncodedArraysEqual(EncodedTypeArray lhs, EncodedTypeArray rhs);
boolean checkUnsortedEncodedKeyComponentsEqual(EncodedKeyComponentType lhs, EncodedKeyComponentType rhs);
/**
@ -255,33 +301,35 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param key dimension value array from a TimeAndDims key
* @return hashcode of the array
*/
int getUnsortedEncodedArrayHashCode(EncodedTypeArray key);
int getUnsortedEncodedKeyComponentHashCode(EncodedKeyComponentType key);
boolean LIST = true;
boolean ARRAY = false;
/**
* Given a row value array from a TimeAndDims key, as described in the documentation for compareUnsortedEncodedArrays(),
* convert the unsorted encoded values to a list or array of actual values.
* Given a row value array from a TimeAndDims key, as described in the documentation for
* compareUnsortedEncodedKeyComponents(), convert the unsorted encoded values to a list or array of actual values.
*
* If the key has one element, this method should return a single Object instead of an array or list, ignoring
* the asList parameter.
*
* @param key dimension value array from a TimeAndDims key
* @param asList if true, return an array; if false, return a list
* @return single value, array, or list containing the actual values corresponding to the encoded values in the input array
* @return single value, array, or list containing the actual values corresponding to the encoded values
* in the input array
*/
Object convertUnsortedEncodedArrayToActualArrayOrList(EncodedTypeArray key, boolean asList);
Object convertUnsortedEncodedKeyComponentToActualArrayOrList(EncodedKeyComponentType key, boolean asList);
/**
* Given a row value array from a TimeAndDims key, as described in the documentation for compareUnsortedEncodedArrays(),
* convert the unsorted encoded values to an array of sorted encoded values (i.e., sorted by their corresponding actual values)
* Given a row value array from a TimeAndDims key, as described in the documentation for
* compareUnsortedEncodedKeyComponents(), convert the unsorted encoded values to an array of sorted encoded values
* (i.e., sorted by their corresponding actual values)
*
* @param key dimension value array from a TimeAndDims key
* @return array containing the sorted encoded values corresponding to the unsorted encoded values in the input array
*/
EncodedTypeArray convertUnsortedEncodedArrayToSortedEncodedArray(EncodedTypeArray key);
EncodedKeyComponentType convertUnsortedEncodedKeyComponentToSortedEncodedKeyComponent(EncodedKeyComponentType key);
/**
@ -295,7 +343,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* For example, if key is an int[] array with values [1,3,4] for a dictionary-encoded String dimension,
* and rowNum is 27, this function would set bit 27 in bitmapIndexes[1], bitmapIndexes[3], and bitmapIndexes[4]
*
* See StringDimensionIndexer.fillBitmapsFromUnsortedEncodedArray() for a reference implementation.
* See StringDimensionIndexer.fillBitmapsFromUnsortedEncodedKeyComponent() for a reference implementation.
*
* If a dimension type does not support bitmap indexes, this function will not be called
* and can be left unimplemented.
@ -305,5 +353,10 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param bitmapIndexes array of bitmaps, indexed by integer dimension value
* @param factory bitmap factory
*/
void fillBitmapsFromUnsortedEncodedArray(EncodedTypeArray key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory);
void fillBitmapsFromUnsortedEncodedKeyComponent(
EncodedKeyComponentType key,
int rowNum,
MutableBitmap[] bitmapIndexes,
BitmapFactory factory
);
}

View File

@ -49,8 +49,13 @@ import java.util.List;
*
* A class implementing this interface is expected to be highly stateful, updating its internal state as these
* functions are called.
*
* @param <EncodedKeyComponentType> A row key contains a component for each dimension, this param specifies the
* class of this dimension's key component. A column type that supports multivalue rows
* should use an array type (Strings would use int[]). Column types without multivalue
* row support should use single objects (e.g., Long, Float).
*/
public interface DimensionMerger<EncodedTypedArray>
public interface DimensionMerger<EncodedKeyComponentType>
{
/**
* Given a list of segment adapters:
@ -72,7 +77,8 @@ public interface DimensionMerger<EncodedTypedArray>
/**
* Convert a row from a single segment to its equivalent representation in the merged set of rows.
* Convert a row's key component with per-segment encoding to its equivalent representation
* in the merged set of rows.
*
* This function is used by the index merging process to build the merged sequence of rows.
*
@ -83,17 +89,22 @@ public interface DimensionMerger<EncodedTypedArray>
* segment-specific dictionary values within the row to the common merged dictionary values
* determined during writeMergedValueMetadata().
*
* @param segmentRow A row from a segment to be converted to its representation within the merged sequence of rows.
* @param segmentRow A row's key component for this dimension. The encoding of the key component's
* values will be converted from per-segment encodings to the combined encodings from
* the merged sequence of rows.
* @param segmentIndexNumber Integer indicating which segment the row originated from.
*/
EncodedTypedArray convertSegmentRowValuesToMergedRowValues(EncodedTypedArray segmentRow, int segmentIndexNumber);
EncodedKeyComponentType convertSegmentRowValuesToMergedRowValues(
EncodedKeyComponentType segmentRow,
int segmentIndexNumber
);
/**
* Process a row from the merged sequence of rows and update the DimensionMerger's internal state.
* Process a key component from the merged sequence of rows and update the DimensionMerger's internal state.
*
* After constructing a merged sequence of rows across segments, the index merging process will
* iterate through these rows and pass row values from each dimension to their correspodning DimensionMergers.
* iterate through these rows and pass row key components from each dimension to their corresponding DimensionMergers.
*
* This allows each DimensionMerger to build its internal view of the sequence of merged rows, to be
* written out to a segment later.
@ -101,7 +112,7 @@ public interface DimensionMerger<EncodedTypedArray>
* @param rowValues The row values to be added.
* @throws IOException
*/
void processMergedRow(EncodedTypedArray rowValues) throws IOException;
void processMergedRow(EncodedKeyComponentType rowValues) throws IOException;
/**

View File

@ -34,7 +34,7 @@ import java.io.IOException;
*
* NOTE: Remove this class when the legacy IndexMerger is deprecated and removed.
*/
public interface DimensionMergerLegacy<EncodedTypeArray> extends DimensionMergerV9<EncodedTypeArray>
public interface DimensionMergerLegacy<EncodedKeyComponentType> extends DimensionMergerV9<EncodedKeyComponentType>
{
/**
* Write this dimension's value metadata to a file.

View File

@ -28,7 +28,7 @@ import java.io.IOException;
*
* DimensionMerger subclass to be used with IndexMergerV9.
*/
public interface DimensionMergerV9<EncodedTypeArray> extends DimensionMerger<EncodedTypeArray>
public interface DimensionMergerV9<EncodedKeyComponentType> extends DimensionMerger<EncodedKeyComponentType>
{
/**
* Return a ColumnDescriptor containing ColumnPartSerde objects appropriate for

View File

@ -0,0 +1,117 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.GenericColumn;
import io.druid.segment.column.FloatColumn;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
public class FloatDimensionHandler implements DimensionHandler<Float, Float, Float>
{
private final String dimensionName;
public FloatDimensionHandler(String dimensionName)
{
this.dimensionName = dimensionName;
}
@Override
public String getDimensionName()
{
return dimensionName;
}
@Override
public DimensionIndexer<Float, Float, Float> makeIndexer()
{
return new FloatDimensionIndexer();
}
@Override
public DimensionMergerV9<Float> makeMerger(
IndexSpec indexSpec, File outDir, IOPeon ioPeon, ColumnCapabilities capabilities, ProgressIndicator progress
) throws IOException
{
return new FloatDimensionMergerV9(
dimensionName,
indexSpec,
outDir,
ioPeon,
capabilities,
progress
);
}
@Override
public DimensionMergerLegacy<Float> makeLegacyMerger(
IndexSpec indexSpec, File outDir, IOPeon ioPeon, ColumnCapabilities capabilities, ProgressIndicator progress
) throws IOException
{
return new FloatDimensionMergerLegacy(
dimensionName,
indexSpec,
outDir,
ioPeon,
capabilities,
progress
);
}
@Override
public int getLengthOfEncodedKeyComponent(Float dimVals)
{
return FloatColumn.ROW_SIZE;
}
@Override
public int compareSortedEncodedKeyComponents(Float lhs, Float rhs)
{
return lhs.compareTo(rhs);
}
@Override
public void validateSortedEncodedKeyComponents(
Float lhs, Float rhs, Indexed<Float> lhsEncodings, Indexed<Float> rhsEncodings
) throws SegmentValidationException
{
if (!lhs.equals(rhs)) {
throw new SegmentValidationException("Dim [%s] value not equal. Expected [%s] found [%s]", lhs, rhs);
}
}
@Override
public Closeable getSubColumn(Column column)
{
return column.getGenericColumn();
}
@Override
public Float getEncodedKeyComponentFromColumn(Closeable column, int currRow)
{
return ((GenericColumn) column).getFloatSingleValueRow(currRow);
}
}

View File

@ -0,0 +1,218 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.collect.ImmutableList;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import java.util.List;
public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Float>
{
@Override
public ValueType getValueType()
{
return ValueType.FLOAT;
}
@Override
public Float processRowValsToUnsortedEncodedKeyComponent(Object dimValues)
{
if (dimValues instanceof List) {
throw new UnsupportedOperationException("Numeric columns do not support multivalue rows.");
}
return DimensionHandlerUtils.convertObjectToFloat(dimValues);
}
@Override
public Float getSortedEncodedValueFromUnsorted(Float unsortedIntermediateValue)
{
return unsortedIntermediateValue;
}
@Override
public Float getUnsortedEncodedValueFromSorted(Float sortedIntermediateValue)
{
return sortedIntermediateValue;
}
@Override
public Indexed<Float> getSortedIndexedValues()
{
throw new UnsupportedOperationException("Numeric columns do not support value dictionaries.");
}
@Override
public Float getMinValue()
{
return Float.MIN_VALUE;
}
@Override
public Float getMaxValue()
{
return Float.MAX_VALUE;
}
@Override
public int getCardinality()
{
return DimensionSelector.CARDINALITY_UNKNOWN;
}
@Override
public DimensionSelector makeDimensionSelector(
DimensionSpec spec, IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
{
return new FloatWrappingDimensionSelector(
makeFloatColumnSelector(currEntry, desc),
spec.getExtractionFn()
);
}
@Override
public LongColumnSelector makeLongColumnSelector(
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();
class IndexerLongColumnSelector implements LongColumnSelector
{
@Override
public long get()
{
final Object[] dims = currEntry.getKey().getDims();
if (dimIndex >= dims.length) {
return 0L;
}
float floatVal = (Float) dims[dimIndex];
return (long) floatVal;
}
}
return new IndexerLongColumnSelector();
}
@Override
public FloatColumnSelector makeFloatColumnSelector(
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();
class IndexerFloatColumnSelector implements FloatColumnSelector
{
@Override
public float get()
{
final Object[] dims = currEntry.getKey().getDims();
if (dimIndex >= dims.length) {
return 0L;
}
return (Float) dims[dimIndex];
}
}
return new IndexerFloatColumnSelector();
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(
final DimensionSpec spec,
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();
class IndexerObjectColumnSelector implements ObjectColumnSelector
{
@Override
public Class classOfObject()
{
return Float.class;
}
@Override
public Object get()
{
final Object[] dims = currEntry.getKey().getDims();
if (dimIndex >= dims.length) {
return 0L;
}
return dims[dimIndex];
}
}
return new IndexerObjectColumnSelector();
}
@Override
public int compareUnsortedEncodedKeyComponents(Float lhs, Float rhs)
{
return lhs.compareTo(rhs);
}
@Override
public boolean checkUnsortedEncodedKeyComponentsEqual(Float lhs, Float rhs)
{
return lhs.equals(rhs);
}
@Override
public int getUnsortedEncodedKeyComponentHashCode(Float key)
{
return key.hashCode();
}
@Override
public Object convertUnsortedEncodedKeyComponentToActualArrayOrList(Float key, boolean asList)
{
return ImmutableList.of(key);
}
@Override
public Float convertUnsortedEncodedKeyComponentToSortedEncodedKeyComponent(Float key)
{
return key;
}
@Override
public void fillBitmapsFromUnsortedEncodedKeyComponent(
Float key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory
)
{
throw new UnsupportedOperationException("Numeric columns do not support bitmaps.");
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.io.ByteSink;
import com.google.common.io.OutputSupplier;
import io.druid.common.guava.FileOutputSupplier;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IOPeon;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
public class FloatDimensionMergerLegacy extends FloatDimensionMergerV9 implements DimensionMergerLegacy<Float>
{
private FloatMetricColumnSerializer serializerV8;
public FloatDimensionMergerLegacy(
String dimensionName,
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
)
{
super(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
}
@Override
protected void setupEncodedValueWriter() throws IOException
{
final CompressedObjectStrategy.CompressionStrategy metCompression = indexSpec.getMetricCompression();
serializerV8 = new FloatMetricColumnSerializer(dimensionName, outDir, ioPeon, metCompression);
serializerV8.open();
}
@Override
public void processMergedRow(Float rowValues) throws IOException
{
serializerV8.serialize(rowValues);
}
@Override
public void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException
{
// floats have no metadata to write
}
@Override
public void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException
{
// closing the serializer writes its data to the file
serializerV8.closeFile(rowValueOut.getFile());
}
@Override
public void writeIndexesToFiles(
ByteSink invertedOut, OutputSupplier<FileOutputStream> spatialOut
) throws IOException
{
// floats have no indices to write
}
@Override
public File makeDimFile() throws IOException
{
return IndexIO.makeNumericDimFile(outDir, dimensionName, IndexIO.BYTE_ORDER);
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.base.Throwables;
import com.google.common.io.Closer;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnDescriptor;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.IOPeon;
import io.druid.segment.serde.FloatGenericColumnPartSerde;
import java.io.File;
import java.io.IOException;
import java.nio.IntBuffer;
import java.util.List;
public class FloatDimensionMergerV9 implements DimensionMergerV9<Float>
{
protected String dimensionName;
protected ProgressIndicator progress;
protected final IndexSpec indexSpec;
protected ColumnCapabilities capabilities;
protected final File outDir;
protected IOPeon ioPeon;
private FloatColumnSerializer serializer;
public FloatDimensionMergerV9(
String dimensionName,
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
)
{
this.dimensionName = dimensionName;
this.indexSpec = indexSpec;
this.capabilities = capabilities;
this.outDir = outDir;
this.ioPeon = ioPeon;
this.progress = progress;
try {
setupEncodedValueWriter();
} catch (IOException ioe) {
Throwables.propagate(ioe);
}
}
protected void setupEncodedValueWriter() throws IOException
{
final CompressedObjectStrategy.CompressionStrategy metCompression = indexSpec.getMetricCompression();
this.serializer = FloatColumnSerializer.create(ioPeon, dimensionName, metCompression);
serializer.open();
}
@Override
public void writeMergedValueMetadata(List<IndexableAdapter> adapters) throws IOException
{
// floats have no additional metadata
}
@Override
public Float convertSegmentRowValuesToMergedRowValues(Float segmentRow, int segmentIndexNumber)
{
return segmentRow;
}
@Override
public void processMergedRow(Float rowValues) throws IOException
{
serializer.serialize(rowValues);
}
@Override
public void writeIndexes(List<IntBuffer> segmentRowNumConversions, Closer closer) throws IOException
{
// floats have no indices to write
}
@Override
public boolean canSkip()
{
// a float column can never be all null
return false;
}
@Override
public ColumnDescriptor makeColumnDescriptor() throws IOException
{
serializer.close();
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
builder.setValueType(ValueType.FLOAT);
builder.addSerde(
FloatGenericColumnPartSerde.serializerBuilder()
.withByteOrder(IndexIO.BYTE_ORDER)
.withDelegate(serializer)
.build()
);
return builder.build();
}
}

View File

@ -74,6 +74,11 @@ public class FloatMetricColumnSerializer implements MetricColumnSerializer
public void close() throws IOException
{
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
closeFile(outFile);
}
public void closeFile(final File outFile) throws IOException
{
outFile.delete();
MetricHolder.writeFloatMetric(
Files.asByteSink(outFile, FileWriteMode.APPEND), metricName, writer

View File

@ -354,7 +354,7 @@ public class IndexIO
}
DimensionHandler dimHandler = dimHandlers.get(dim1Name);
dimHandler.validateSortedEncodedArrays(
dimHandler.validateSortedEncodedKeyComponents(
dim1Vals,
dim2Vals,
adapter1.getDimValueLookup(dim1Name),

View File

@ -0,0 +1,117 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.GenericColumn;
import io.druid.segment.column.LongColumn;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
public class LongDimensionHandler implements DimensionHandler<Long, Long, Long>
{
private final String dimensionName;
public LongDimensionHandler(String dimensionName)
{
this.dimensionName = dimensionName;
}
@Override
public String getDimensionName()
{
return dimensionName;
}
@Override
public DimensionIndexer<Long, Long, Long> makeIndexer()
{
return new LongDimensionIndexer();
}
@Override
public DimensionMergerV9<Long> makeMerger(
IndexSpec indexSpec, File outDir, IOPeon ioPeon, ColumnCapabilities capabilities, ProgressIndicator progress
) throws IOException
{
return new LongDimensionMergerV9(
dimensionName,
indexSpec,
outDir,
ioPeon,
capabilities,
progress
);
}
@Override
public DimensionMergerLegacy<Long> makeLegacyMerger(
IndexSpec indexSpec, File outDir, IOPeon ioPeon, ColumnCapabilities capabilities, ProgressIndicator progress
) throws IOException
{
return new LongDimensionMergerLegacy(
dimensionName,
indexSpec,
outDir,
ioPeon,
capabilities,
progress
);
}
@Override
public int getLengthOfEncodedKeyComponent(Long dimVals)
{
return LongColumn.ROW_SIZE;
}
@Override
public int compareSortedEncodedKeyComponents(Long lhs, Long rhs)
{
return lhs.compareTo(rhs);
}
@Override
public void validateSortedEncodedKeyComponents(
Long lhs, Long rhs, Indexed<Long> lhsEncodings, Indexed<Long> rhsEncodings
) throws SegmentValidationException
{
if (!lhs.equals(rhs)) {
throw new SegmentValidationException("Dim [%s] value not equal. Expected [%s] found [%s]", lhs, rhs);
}
}
@Override
public Closeable getSubColumn(Column column)
{
return column.getGenericColumn();
}
@Override
public Long getEncodedKeyComponentFromColumn(Closeable column, int currRow)
{
return ((GenericColumn) column).getLongSingleValueRow(currRow);
}
}

View File

@ -0,0 +1,218 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.collect.ImmutableList;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import java.util.List;
public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
{
@Override
public ValueType getValueType()
{
return ValueType.LONG;
}
@Override
public Long processRowValsToUnsortedEncodedKeyComponent(Object dimValues)
{
if (dimValues instanceof List) {
throw new UnsupportedOperationException("Numeric columns do not support multivalue rows.");
}
return DimensionHandlerUtils.convertObjectToLong(dimValues);
}
@Override
public Long getSortedEncodedValueFromUnsorted(Long unsortedIntermediateValue)
{
return unsortedIntermediateValue;
}
@Override
public Long getUnsortedEncodedValueFromSorted(Long sortedIntermediateValue)
{
return sortedIntermediateValue;
}
@Override
public Indexed<Long> getSortedIndexedValues()
{
throw new UnsupportedOperationException("Numeric columns do not support value dictionaries.");
}
@Override
public Long getMinValue()
{
return Long.MIN_VALUE;
}
@Override
public Long getMaxValue()
{
return Long.MAX_VALUE;
}
@Override
public int getCardinality()
{
return DimensionSelector.CARDINALITY_UNKNOWN;
}
@Override
public DimensionSelector makeDimensionSelector(
DimensionSpec spec, IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
{
return new LongWrappingDimensionSelector(
makeLongColumnSelector(currEntry, desc),
spec.getExtractionFn()
);
}
@Override
public LongColumnSelector makeLongColumnSelector(
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();
class IndexerLongColumnSelector implements LongColumnSelector
{
@Override
public long get()
{
final Object[] dims = currEntry.getKey().getDims();
if (dimIndex >= dims.length) {
return 0L;
}
return (Long) dims[dimIndex];
}
}
return new IndexerLongColumnSelector();
}
@Override
public FloatColumnSelector makeFloatColumnSelector(
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();
class IndexerFloatColumnSelector implements FloatColumnSelector
{
@Override
public float get()
{
final Object[] dims = currEntry.getKey().getDims();
if (dimIndex >= dims.length) {
return 0.0f;
}
long longVal = (Long) dims[dimIndex];
return (float) longVal;
}
}
return new IndexerFloatColumnSelector();
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(
final DimensionSpec spec,
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final int dimIndex = desc.getIndex();
class IndexerObjectColumnSelector implements ObjectColumnSelector
{
@Override
public Class classOfObject()
{
return Long.class;
}
@Override
public Object get()
{
final Object[] dims = currEntry.getKey().getDims();
if (dimIndex >= dims.length) {
return 0L;
}
return dims[dimIndex];
}
}
return new IndexerObjectColumnSelector();
}
@Override
public int compareUnsortedEncodedKeyComponents(Long lhs, Long rhs)
{
return lhs.compareTo(rhs);
}
@Override
public boolean checkUnsortedEncodedKeyComponentsEqual(Long lhs, Long rhs)
{
return lhs.equals(rhs);
}
@Override
public int getUnsortedEncodedKeyComponentHashCode(Long key)
{
return key.hashCode();
}
@Override
public Object convertUnsortedEncodedKeyComponentToActualArrayOrList(Long key, boolean asList)
{
return ImmutableList.of(key);
}
@Override
public Long convertUnsortedEncodedKeyComponentToSortedEncodedKeyComponent(Long key)
{
return key;
}
@Override
public void fillBitmapsFromUnsortedEncodedKeyComponent(
Long key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory
)
{
throw new UnsupportedOperationException("Numeric columns do not support bitmaps.");
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.io.ByteSink;
import com.google.common.io.OutputSupplier;
import io.druid.common.guava.FileOutputSupplier;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.IOPeon;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
public class LongDimensionMergerLegacy extends LongDimensionMergerV9 implements DimensionMergerLegacy<Long>
{
private LongMetricColumnSerializer serializerV8;
public LongDimensionMergerLegacy(
String dimensionName,
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
)
{
super(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
}
@Override
protected void setupEncodedValueWriter() throws IOException
{
final CompressedObjectStrategy.CompressionStrategy metCompression = indexSpec.getMetricCompression();
final CompressionFactory.LongEncodingStrategy longEncoding = indexSpec.getLongEncoding();
serializerV8 = new LongMetricColumnSerializer(dimensionName, outDir, ioPeon, metCompression, longEncoding);
serializerV8.open();
}
@Override
public void processMergedRow(Long rowValues) throws IOException
{
serializerV8.serialize(rowValues);
}
@Override
public void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException
{
// longs have no metadata to write
}
@Override
public void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException
{
// closing the serializer writes its data to the file
serializerV8.closeFile(rowValueOut.getFile());
}
@Override
public void writeIndexesToFiles(
ByteSink invertedOut, OutputSupplier<FileOutputStream> spatialOut
) throws IOException
{
// longs have no indices to write
}
@Override
public File makeDimFile() throws IOException
{
return IndexIO.makeNumericDimFile(outDir, dimensionName, IndexIO.BYTE_ORDER);
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.base.Throwables;
import com.google.common.io.Closer;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnDescriptor;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.IOPeon;
import io.druid.segment.serde.LongGenericColumnPartSerde;
import java.io.File;
import java.io.IOException;
import java.nio.IntBuffer;
import java.util.List;
public class LongDimensionMergerV9 implements DimensionMergerV9<Long>
{
protected String dimensionName;
protected ProgressIndicator progress;
protected final IndexSpec indexSpec;
protected ColumnCapabilities capabilities;
protected final File outDir;
protected IOPeon ioPeon;
protected LongColumnSerializer serializer;
public LongDimensionMergerV9(
String dimensionName,
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
)
{
this.dimensionName = dimensionName;
this.indexSpec = indexSpec;
this.capabilities = capabilities;
this.outDir = outDir;
this.ioPeon = ioPeon;
this.progress = progress;
try {
setupEncodedValueWriter();
} catch (IOException ioe) {
Throwables.propagate(ioe);
}
}
protected void setupEncodedValueWriter() throws IOException
{
final CompressedObjectStrategy.CompressionStrategy metCompression = indexSpec.getMetricCompression();
final CompressionFactory.LongEncodingStrategy longEncoding = indexSpec.getLongEncoding();
this.serializer = LongColumnSerializer.create(ioPeon, dimensionName, metCompression, longEncoding);
serializer.open();
}
@Override
public void writeMergedValueMetadata(List<IndexableAdapter> adapters) throws IOException
{
// longs have no additional metadata
}
@Override
public Long convertSegmentRowValuesToMergedRowValues(Long segmentRow, int segmentIndexNumber)
{
return segmentRow;
}
@Override
public void processMergedRow(Long rowValues) throws IOException
{
serializer.serialize(rowValues);
}
@Override
public void writeIndexes(List<IntBuffer> segmentRowNumConversions, Closer closer) throws IOException
{
// longs have no indices to write
}
@Override
public boolean canSkip()
{
// a long column can never be all null
return false;
}
@Override
public ColumnDescriptor makeColumnDescriptor() throws IOException
{
serializer.close();
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
builder.setValueType(ValueType.LONG);
builder.addSerde(
LongGenericColumnPartSerde.serializerBuilder()
.withByteOrder(IndexIO.BYTE_ORDER)
.withDelegate(serializer)
.build()
);
return builder.build();
}
}

View File

@ -77,6 +77,11 @@ public class LongMetricColumnSerializer implements MetricColumnSerializer
public void close() throws IOException
{
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
closeFile(outFile);
}
public void closeFile(final File outFile) throws IOException
{
outFile.delete();
MetricHolder.writeLongMetric(
Files.asByteSink(outFile, FileWriteMode.APPEND), metricName, writer

View File

@ -19,6 +19,7 @@
package io.druid.segment;
import java.io.File;
import java.io.IOException;
/**
@ -28,4 +29,5 @@ public interface MetricColumnSerializer
public void open() throws IOException;
public void serialize(Object aggs) throws IOException;
public void close() throws IOException;
public void closeFile(File outFile) throws IOException;
}

View File

@ -255,7 +255,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
final Object[] dims = new Object[columns.length];
int dimIndex = 0;
for (final Closeable column : columns) {
dims[dimIndex] = handlers[dimIndex].getRowValueArrayFromColumn(column, currRow);
dims[dimIndex] = handlers[dimIndex].getEncodedKeyComponentFromColumn(column, currRow);
dimIndex++;
}

View File

@ -122,7 +122,7 @@ public class Rowboat implements Comparable<Rowboat>
}
DimensionHandler handler = handlers[index];
retVal = handler.compareSortedEncodedArrays(lhsVals, rhsVals);
retVal = handler.compareSortedEncodedKeyComponents(lhsVals, rhsVals);
++index;
}

View File

@ -51,13 +51,13 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
}
@Override
public int getLengthFromEncodedArray(int[] dimVals)
public int getLengthOfEncodedKeyComponent(int[] dimVals)
{
return dimVals.length;
}
@Override
public int compareSortedEncodedArrays(int[] lhs, int[] rhs)
public int compareSortedEncodedKeyComponents(int[] lhs, int[] rhs)
{
int lhsLen = lhs.length;
int rhsLen = rhs.length;
@ -73,7 +73,7 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
}
@Override
public void validateSortedEncodedArrays(
public void validateSortedEncodedKeyComponents(
int[] lhs,
int[] rhs,
Indexed<String> lhsEncodings,
@ -166,7 +166,7 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
}
@Override
public Object getRowValueArrayFromColumn(Closeable column, int currRow)
public int[] getEncodedKeyComponentFromColumn(Closeable column, int currRow)
{
DictionaryEncodedColumn dict = (DictionaryEncodedColumn) column;
int[] theVals;

View File

@ -32,6 +32,7 @@ import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.ValueMatcher;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.ArrayBasedIndexedInts;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
@ -220,7 +221,13 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
@Override
public int[] processRowValsToUnsortedEncodedArray(Object dimValues)
public ValueType getValueType()
{
return ValueType.STRING;
}
@Override
public int[] processRowValsToUnsortedEncodedKeyComponent(Object dimValues)
{
final int[] encodedDimensionValues;
final int oldDictSize = dimLookup.size();
@ -340,7 +347,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
@Override
public int compareUnsortedEncodedArrays(int[] lhs, int[] rhs)
public int compareUnsortedEncodedKeyComponents(int[] lhs, int[] rhs)
{
int lhsLen = lhs.length;
int rhsLen = rhs.length;
@ -365,19 +372,19 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
@Override
public boolean checkUnsortedEncodedArraysEqual(int[] lhs, int[] rhs)
public boolean checkUnsortedEncodedKeyComponentsEqual(int[] lhs, int[] rhs)
{
return Arrays.equals(lhs, rhs);
}
@Override
public int getUnsortedEncodedArrayHashCode(int[] key)
public int getUnsortedEncodedKeyComponentHashCode(int[] key)
{
return Arrays.hashCode(key);
}
@Override
public DimensionSelector makeColumnValueSelector(
public DimensionSelector makeDimensionSelector(
final DimensionSpec spec,
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final IncrementalIndex.DimensionDesc desc
@ -539,7 +546,70 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
@Override
public Object convertUnsortedEncodedArrayToActualArrayOrList(int[] key, boolean asList)
public LongColumnSelector makeLongColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
{
return ZeroLongColumnSelector.instance();
}
@Override
public FloatColumnSelector makeFloatColumnSelector(
IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc
)
{
return ZeroFloatColumnSelector.instance();
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(
final DimensionSpec spec,
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
final IncrementalIndex.DimensionDesc desc
)
{
final ExtractionFn extractionFn = spec.getExtractionFn();
final int dimIndex = desc.getIndex();
class StringIndexerObjectColumnSelector implements ObjectColumnSelector<String>
{
@Override
public Class<String> classOfObject()
{
return String.class;
}
@Override
public String get()
{
final Object[] dims = currEntry.getKey().getDims();
int[] indices;
if (dimIndex < dims.length) {
indices = (int[]) dims[dimIndex];
if (indices.length > 1) {
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multi-value columns."
);
}
} else {
indices = null;
}
if (indices == null || indices.length == 0) {
return extractionFn.apply(null);
}
final String strValue = getActualValue(indices[0], false);
return extractionFn == null ? strValue : extractionFn.apply(strValue);
}
}
return new StringIndexerObjectColumnSelector();
}
@Override
public Object convertUnsortedEncodedKeyComponentToActualArrayOrList(int[] key, boolean asList)
{
if (key == null || key.length == 0) {
return null;
@ -568,7 +638,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
@Override
public int[] convertUnsortedEncodedArrayToSortedEncodedArray(int[] key)
public int[] convertUnsortedEncodedKeyComponentToSortedEncodedKeyComponent(int[] key)
{
int[] sortedDimVals = new int[key.length];
for (int i = 0; i < key.length; ++i) {
@ -579,7 +649,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
@Override
public void fillBitmapsFromUnsortedEncodedArray(
public void fillBitmapsFromUnsortedEncodedKeyComponent(
int[] key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory
)
{

View File

@ -25,6 +25,8 @@ import io.druid.segment.data.CompressedFloatsIndexedSupplier;
*/
public class FloatColumn extends AbstractColumn
{
public static final int ROW_SIZE = 1;
private static final ColumnCapabilitiesImpl CAPABILITIES = new ColumnCapabilitiesImpl()
.setType(ValueType.FLOAT);

View File

@ -25,6 +25,8 @@ import io.druid.segment.data.CompressedLongsIndexedSupplier;
*/
public class LongColumn extends AbstractColumn
{
public static final int ROW_SIZE = 1;
private static final ColumnCapabilitiesImpl CAPABILITIES = new ColumnCapabilitiesImpl()
.setType(ValueType.LONG);

View File

@ -430,10 +430,10 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
DimensionHandler handler = desc.getHandler();
DimensionIndexer indexer = desc.getIndexer();
Object dimsKey = indexer.processRowValsToUnsortedEncodedArray(row.getRaw(dimension));
Object dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension));
// Set column capabilities as data is coming in
if (!capabilities.hasMultipleValues() && dimsKey != null && handler.getLengthFromEncodedArray(dimsKey) > 1) {
if (!capabilities.hasMultipleValues() && dimsKey != null && handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) {
capabilities.setHasMultipleValues(true);
}
@ -704,12 +704,12 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
String dimensionName = dimensionDesc.getName();
DimensionHandler handler = dimensionDesc.getHandler();
if (dim == null || handler.getLengthFromEncodedArray(dim) == 0) {
if (dim == null || handler.getLengthOfEncodedKeyComponent(dim) == 0) {
theVals.put(dimensionName, null);
continue;
}
final DimensionIndexer indexer = dimensionDesc.getIndexer();
Object rowVals = indexer.convertUnsortedEncodedArrayToActualArrayOrList(dim, DimensionIndexer.LIST);
Object rowVals = indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList(dim, DimensionIndexer.LIST);
theVals.put(dimensionName, rowVals);
}
@ -894,7 +894,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
for (int i = 0; i < dims.length; i++) {
final DimensionIndexer indexer = dimensionDescsList.get(i).getIndexer();
if (!indexer.checkUnsortedEncodedArraysEqual(dims[i], that.dims[i])) {
if (!indexer.checkUnsortedEncodedKeyComponentsEqual(dims[i], that.dims[i])) {
return false;
}
}
@ -907,7 +907,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
int hash = (int) timestamp;
for (int i = 0; i < dims.length; i++) {
final DimensionIndexer indexer = dimensionDescsList.get(i).getIndexer();
hash = 31 * hash + indexer.getUnsortedEncodedArrayHashCode(dims[i]);
hash = 31 * hash + indexer.getUnsortedEncodedKeyComponentHashCode(dims[i]);
}
return hash;
}
@ -961,7 +961,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
}
final DimensionIndexer indexer = dimensionDescs.get(index).getIndexer();
retVal = indexer.compareUnsortedEncodedArrays(lhsIdxs, rhsIdxs);
retVal = indexer.compareUnsortedEncodedKeyComponents(lhsIdxs, rhsIdxs);
++index;
}

View File

@ -104,7 +104,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
// Add 'null' to the dimension's dictionary.
if (dimIndex >= dims.length || dims[dimIndex] == null) {
accessor.indexer.processRowValsToUnsortedEncodedArray(null);
accessor.indexer.processRowValsToUnsortedEncodedKeyComponent(null);
continue;
}
final ColumnCapabilities capabilities = dimension.getCapabilities();
@ -112,7 +112,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
if(capabilities.hasBitmapIndexes()) {
final MutableBitmap[] bitmapIndexes = accessor.invertedIndexes;
final DimensionIndexer indexer = accessor.indexer;
indexer.fillBitmapsFromUnsortedEncodedArray(dims[dimIndex], rowNum, bitmapIndexes, bitmapFactory);
indexer.fillBitmapsFromUnsortedEncodedKeyComponent(dims[dimIndex], rowNum, bitmapIndexes, bitmapFactory);
}
}
++rowNum;
@ -199,7 +199,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
final DimensionIndexer indexer = indexers[dimIndex];
Object sortedDimVals = indexer.convertUnsortedEncodedArrayToSortedEncodedArray(dimValues[dimIndex]);
Object sortedDimVals = indexer.convertUnsortedEncodedKeyComponentToSortedEncodedKeyComponent(dimValues[dimIndex]);
dims[dimIndex] = sortedDimVals;
}

View File

@ -27,7 +27,6 @@ import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.QueryInterruptedException;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.Filter;
@ -363,23 +362,26 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return selector;
}
ColumnCapabilities capabilities = getColumnCapabilities(dimension);
if (capabilities != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionSpec.getDimension());
if (dimensionDesc == null) {
// not a dimension, column may be a metric
ColumnCapabilities capabilities = getColumnCapabilities(dimension);
if (capabilities == null) {
return NullDimensionSelector.instance();
}
if (capabilities.getType() == ValueType.LONG) {
return new LongWrappingDimensionSelector(makeLongColumnSelector(dimension), extractionFn);
}
if (capabilities.getType() == ValueType.FLOAT) {
return new FloatWrappingDimensionSelector(makeFloatColumnSelector(dimension), extractionFn);
}
}
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimensionSpec.getDimension());
if (dimensionDesc == null) {
// if we can't wrap the base column, just return a column of all nulls
return NullDimensionSelector.instance();
} else {
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return indexer.makeDimensionSelector(dimensionSpec, currEntry, dimensionDesc);
}
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return (DimensionSelector) indexer.makeColumnValueSelector(dimensionSpec, currEntry, dimensionDesc);
}
@Override
@ -393,8 +395,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return (FloatColumnSelector) indexer.makeColumnValueSelector(
new DefaultDimensionSpec(columnName, null),
return indexer.makeFloatColumnSelector(
currEntry,
dimensionDesc
);
@ -438,8 +439,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
if (dimIndex != null) {
final IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(columnName);
final DimensionIndexer indexer = dimensionDesc.getIndexer();
return (LongColumnSelector) indexer.makeColumnValueSelector(
new DefaultDimensionSpec(columnName, null),
return indexer.makeLongColumnSelector(
currEntry,
dimensionDesc
);
@ -542,7 +542,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return null;
}
return indexer.convertUnsortedEncodedArrayToActualArrayOrList(
return indexer.convertUnsortedEncodedKeyComponentToActualArrayOrList(
dims[dimensionIndex], DimensionIndexer.ARRAY
);
}

View File

@ -76,6 +76,12 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer
writer.close();
final File outFile = IndexIO.makeMetricFile(outDir, metricName, IndexIO.BYTE_ORDER);
closeFile(outFile);
}
@Override
public void closeFile(final File outFile) throws IOException
{
outFile.delete();
MetricHolder.writeComplexMetric(
Files.newOutputStreamSupplier(outFile, true), metricName, serde.getTypeName(), writer

View File

@ -7916,4 +7916,50 @@ public class GroupByQueryRunnerTest
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithAggsOnNumericDimensions()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setDimFilter(new SelectorDimFilter("quality", "technology", null))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("qlLong", "qualityLong"),
new DoubleSumAggregatorFactory("qlFloat", "qualityLong"),
new DoubleSumAggregatorFactory("qfFloat", "qualityFloat"),
new LongSumAggregatorFactory("qfLong", "qualityFloat")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-01",
"alias", "technology",
"rows", 1L,
"qlLong", 1700L,
"qlFloat", 1700.0,
"qfFloat", 17000.0,
"qfLong", 17000L
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-02",
"alias", "technology",
"rows", 1L,
"qlLong", 1700L,
"qlFloat", 1700.0,
"qfFloat", 17000.0,
"qfLong", 17000L
)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
}

View File

@ -20,6 +20,7 @@
package io.druid.query.metadata;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.LegacyDataSource;
import io.druid.query.QueryRunner;
@ -75,15 +76,21 @@ public class SegmentAnalyzerTest
columns.size()
); // All columns including time and empty/null column
for (String dimension : TestIndex.DIMENSIONS) {
for (DimensionSchema schema : TestIndex.DIMENSION_SCHEMAS) {
final String dimension = schema.getName();
final ColumnAnalysis columnAnalysis = columns.get(dimension);
final boolean isString = schema.getValueType().name().equals(ValueType.STRING.name());
Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType());
if (analyses == null) {
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
Assert.assertEquals(dimension, schema.getValueType().name(), columnAnalysis.getType());
Assert.assertEquals(dimension, 0, columnAnalysis.getSize());
if (isString) {
if (analyses == null) {
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
} else {
Assert.assertEquals(dimension, 0, columnAnalysis.getCardinality().longValue());
}
} else {
Assert.assertEquals(dimension, 0, columnAnalysis.getCardinality().longValue());
Assert.assertEquals(dimension, 0, columnAnalysis.getSize());
Assert.assertNull(dimension, columnAnalysis.getCardinality());
}
}
@ -121,17 +128,24 @@ public class SegmentAnalyzerTest
columns.size()
); // All columns including time and excluding empty/null column
for (String dimension : TestIndex.DIMENSIONS) {
for (DimensionSchema schema : TestIndex.DIMENSION_SCHEMAS) {
final String dimension = schema.getName();
final ColumnAnalysis columnAnalysis = columns.get(dimension);
if (dimension.equals("null_column")) {
Assert.assertNull(columnAnalysis);
} else {
Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType());
final boolean isString = schema.getValueType().name().equals(ValueType.STRING.name());
Assert.assertEquals(dimension, schema.getValueType().name(), columnAnalysis.getType());
Assert.assertEquals(dimension, 0, columnAnalysis.getSize());
if (analyses == null) {
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
if (isString) {
if (analyses == null) {
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
} else {
Assert.assertEquals(dimension, 0, columnAnalysis.getCardinality().longValue());
}
} else {
Assert.assertEquals(dimension, 0, columnAnalysis.getCardinality().longValue());
Assert.assertNull(dimension, columnAnalysis.getCardinality());
}
}
}

View File

@ -165,8 +165,8 @@ public class SelectQueryRunnerTest
PagingOffset offset = query.getPagingOffset(QueryRunnerTestHelper.segmentId);
List<Result<SelectResultValue>> expectedResults = toExpected(
toFullEvents(V_0112_0114),
Lists.newArrayList("market", "quality", "qualityNumericString", "placement", "placementish", "partial_null_column", "null_column"),
Lists.newArrayList("index", "quality_uniques", "qualityLong", "qualityFloat", "indexMin", "indexMaxPlusTen"),
Lists.newArrayList("market", "quality", "qualityLong", "qualityFloat", "qualityNumericString", "placement", "placementish", "partial_null_column", "null_column"),
Lists.newArrayList("index", "quality_uniques", "indexMin", "indexMaxPlusTen"),
offset.startOffset(),
offset.threshold()
);
@ -255,7 +255,7 @@ public class SelectQueryRunnerTest
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, 2),
Sets.newHashSet("mar", "qual", "place"),
Sets.newHashSet("index", "quality_uniques", "qualityLong", "qualityFloat", "indexMin", "indexMaxPlusTen"),
Sets.newHashSet("index", "quality_uniques", "indexMin", "indexMaxPlusTen"),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
@ -301,7 +301,7 @@ public class SelectQueryRunnerTest
new SelectResultValue(
ImmutableMap.of(QueryRunnerTestHelper.segmentId, -3),
Sets.newHashSet("mar", "qual", "place"),
Sets.newHashSet("index", "qualityLong", "qualityFloat", "quality_uniques", "indexMin", "indexMaxPlusTen"),
Sets.newHashSet("index", "quality_uniques", "indexMin", "indexMaxPlusTen"),
Arrays.asList(
new EventHolder(
QueryRunnerTestHelper.segmentId,
@ -617,8 +617,8 @@ public class SelectQueryRunnerTest
new DateTime("2011-01-12T00:00:00.000Z"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("market", "quality", "qualityNumericString", "placement", "placementish", "partial_null_column", "null_column"),
Sets.newHashSet("index", "quality_uniques", "qualityLong", "qualityFloat", "indexMin", "indexMaxPlusTen"),
Sets.newHashSet("market", "quality", "qualityLong", "qualityFloat", "qualityNumericString", "placement", "placementish", "partial_null_column", "null_column"),
Sets.newHashSet("index", "quality_uniques", "indexMin", "indexMaxPlusTen"),
Lists.<EventHolder>newArrayList()
)
)

View File

@ -48,10 +48,11 @@ import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
@ -4738,4 +4739,83 @@ public class TopNQueryRunnerTest
);
assertExpectedResults(expectedResults, query);
}
@Test
public void testFullOnTopNWithAggsOnNumericDims()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index"),
new LongSumAggregatorFactory("qlLong", "qualityLong"),
new DoubleSumAggregatorFactory("qlFloat", "qualityLong"),
new DoubleSumAggregatorFactory("qfFloat", "qualityFloat"),
new LongSumAggregatorFactory("qfLong", "qualityFloat")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.put("qlLong", 279000L)
.put("qlFloat", 279000.0)
.put("qfFloat", 2790000.0)
.put("qfLong", 2790000L)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.put("qlLong", 279000L)
.put("qlFloat", 279000.0)
.put("qfFloat", 2790000.0)
.put("qfLong", 2790000L)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.put("qlLong", 1171800L)
.put("qlFloat", 1171800.0)
.put("qfFloat", 11718000.0)
.put("qfLong", 11718000L)
.build()
)
)
)
);
assertExpectedResults(expectedResults, query);
}
}

View File

@ -33,6 +33,9 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.granularity.Granularities;
@ -2004,6 +2007,127 @@ public class IndexMergerTest
Assert.assertEquals(ImmutableSet.of("A", "B", "C"), ImmutableSet.copyOf(adapter.getAvailableMetrics()));
}
@Test
public void testMergeNumericDims() throws Exception
{
IncrementalIndex toPersist1 = getIndexWithNumericDims();
IncrementalIndex toPersist2 = getIndexWithNumericDims();
final File tmpDir = temporaryFolder.newFolder();
final File tmpDir2 = temporaryFolder.newFolder();
final File tmpDirMerged = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist1,
tmpDir,
indexSpec
)
)
);
QueryableIndex index2 = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist2,
tmpDir2,
indexSpec
)
)
);
final QueryableIndex merged = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.mergeQueryableIndex(
Arrays.asList(index1, index2),
true,
new AggregatorFactory[]{new CountAggregatorFactory("count")},
tmpDirMerged,
indexSpec
)
)
);
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged);
Iterable<Rowboat> boats = adapter.getRows();
List<Rowboat> boatList = Lists.newArrayList(boats);
Assert.assertEquals(ImmutableList.of("dimA", "dimB", "dimC"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new Object[]{0L, 0.0f, new int[]{2}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new Object[]{72L, 60000.789f, new int[]{3}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new Object[]{100L, 4000.567f, new int[]{1}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new Object[]{3001L, 1.2345f, new int[]{0}}, boatList.get(3).getDims());
Assert.assertArrayEquals(new Object[]{2L}, boatList.get(2).getMetrics());
}
private IncrementalIndex getIndexWithNumericDims() throws Exception
{
IncrementalIndex index = getIndexWithDimsFromSchemata(
Arrays.asList(
new LongDimensionSchema("dimA"),
new FloatDimensionSchema("dimB"),
new StringDimensionSchema("dimC")
)
);
index.add(
new MapBasedInputRow(
1,
Arrays.asList("dimA", "dimB", "dimC"),
ImmutableMap.<String, Object>of("dimA", 100L, "dimB", 4000.567, "dimC", "Hello")
)
);
index.add(
new MapBasedInputRow(
1,
Arrays.asList("dimA", "dimB", "dimC"),
ImmutableMap.<String, Object>of("dimA", 72L, "dimB", 60000.789, "dimC", "World")
)
);
index.add(
new MapBasedInputRow(
1,
Arrays.asList("dimA", "dimB", "dimC"),
ImmutableMap.<String, Object>of("dimA", 3001L, "dimB", 1.2345, "dimC", "Foobar")
)
);
index.add(
new MapBasedInputRow(
1,
Arrays.asList("dimA", "dimB", "dimC"),
ImmutableMap.<String, Object>of("dimC", "Nully Row")
)
);
return index;
}
private IncrementalIndex getIndexWithDimsFromSchemata(List<DimensionSchema> dims)
{
IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(0L)
.withQueryGranularity(Granularities.NONE)
.withDimensionsSpec(new DimensionsSpec(dims, null, null))
.withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")})
.withRollup(true)
.build();
return new OnheapIncrementalIndex(schema, true, 1000);
}
@Test
public void testPersistNullColumnSkipping() throws Exception
{

View File

@ -124,7 +124,7 @@ public class StringDimensionHandlerTest
String name1 = dimNames1.get(i);
String name2 = dimNames2.get(i);
DimensionHandler handler = handlers.get(name1);
handler.validateSortedEncodedArrays(
handler.validateSortedEncodedKeyComponents(
val1,
val2,
adapter1.getDimValueLookup(name1),

View File

@ -25,7 +25,11 @@ import com.google.common.io.CharSource;
import com.google.common.io.LineProcessor;
import com.google.common.io.Resources;
import io.druid.data.input.impl.DelimitedParseSpec;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.FloatDimensionSchema;
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.hll.HyperLogLogHash;
@ -35,7 +39,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.incremental.IncrementalIndex;
@ -50,6 +53,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -75,12 +79,33 @@ public class TestIndex
public static final String[] DIMENSIONS = new String[]{
"market",
"quality",
"qualityLong",
"qualityFloat",
"qualityNumericString",
"placement",
"placementish",
"partial_null_column",
"null_column",
};
"null_column"
};
public static final List<DimensionSchema> DIMENSION_SCHEMAS = Arrays.asList(
new StringDimensionSchema("market"),
new StringDimensionSchema("quality"),
new LongDimensionSchema("qualityLong"),
new FloatDimensionSchema("qualityFloat"),
new StringDimensionSchema("qualityNumericString"),
new StringDimensionSchema("placement"),
new StringDimensionSchema("placementish"),
new StringDimensionSchema("partial_null_column"),
new StringDimensionSchema("null_column")
);
public static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
DIMENSION_SCHEMAS,
null,
null
);
public static final String[] METRICS = new String[]{"index", "indexMin", "indexMaxPlusTen"};
private static final Logger log = new Logger(TestIndex.class);
private static final Interval DATA_INTERVAL = new Interval("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
@ -93,9 +118,7 @@ public class TestIndex
new DoubleSumAggregatorFactory(METRICS[0], METRICS[0]),
new DoubleMinAggregatorFactory(METRICS[1], METRICS[0]),
new DoubleMaxAggregatorFactory(METRICS[2], VIRTUAL_COLUMNS.getVirtualColumns()[0].getOutputName()),
new HyperUniquesAggregatorFactory("quality_uniques", "quality"),
new LongSumAggregatorFactory("qualityLong", "qualityLong"),
new DoubleSumAggregatorFactory("qualityFloat", "qualityFloat")
new HyperUniquesAggregatorFactory("quality_uniques", "quality")
};
private static final IndexSpec indexSpec = new IndexSpec();
@ -237,6 +260,7 @@ public class TestIndex
.withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis())
.withTimestampSpec(new TimestampSpec("ds", "auto", null))
.withQueryGranularity(Granularities.NONE)
.withDimensionsSpec(DIMENSIONS_SPEC)
.withVirtualColumns(VIRTUAL_COLUMNS)
.withMetrics(METRIC_AGGS)
.withRollup(rollup)
@ -264,7 +288,7 @@ public class TestIndex
final StringInputRowParser parser = new StringInputRowParser(
new DelimitedParseSpec(
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList(DIMENSIONS)), null, null),
new DimensionsSpec(DIMENSION_SCHEMAS, null, null),
"\t",
"\u0001",
Arrays.asList(COLUMNS)