mirror of https://github.com/apache/druid.git
Support configuration for handling multi-valued dimension (#2541)
* Support configuration for handling multi-valued dimension * Addressed comments * use MultiValueHandling.ofDefault() for missing policy
This commit is contained in:
parent
4203580290
commit
b99e14e732
|
@ -67,18 +67,60 @@ public abstract class DimensionSchema
|
|||
}
|
||||
}
|
||||
|
||||
private final String name;
|
||||
public static enum MultiValueHandling
|
||||
{
|
||||
SORTED_ARRAY,
|
||||
SORTED_SET,
|
||||
ARRAY {
|
||||
@Override
|
||||
public boolean needSorting() { return false;}
|
||||
};
|
||||
|
||||
protected DimensionSchema(String name)
|
||||
public boolean needSorting()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonValue
|
||||
public String toString()
|
||||
{
|
||||
return name().toUpperCase();
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public static MultiValueHandling fromString(String name)
|
||||
{
|
||||
return name == null ? ofDefault() : valueOf(name.toUpperCase());
|
||||
}
|
||||
|
||||
// this can be system configuration
|
||||
public static MultiValueHandling ofDefault()
|
||||
{
|
||||
return SORTED_ARRAY;
|
||||
}
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final MultiValueHandling multiValueHandling;
|
||||
|
||||
protected DimensionSchema(String name, MultiValueHandling multiValueHandling)
|
||||
{
|
||||
this.name = Preconditions.checkNotNull(name, "Dimension name cannot be null.");
|
||||
this.multiValueHandling = multiValueHandling;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
};
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public MultiValueHandling getMultiValueHandling()
|
||||
{
|
||||
return multiValueHandling;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public abstract String getTypeName();
|
||||
|
|
|
@ -42,7 +42,20 @@ public class DimensionsSpec
|
|||
private final Set<String> dimensionExclusions;
|
||||
private final Map<String, DimensionSchema> dimensionSchemaMap;
|
||||
|
||||
public static DimensionsSpec ofEmpty()
|
||||
{
|
||||
return new DimensionsSpec(null, null, null);
|
||||
}
|
||||
|
||||
public static List<DimensionSchema> getDefaultSchemas(List<String> dimNames)
|
||||
{
|
||||
return getDefaultSchemas(dimNames, DimensionSchema.MultiValueHandling.ofDefault());
|
||||
}
|
||||
|
||||
public static List<DimensionSchema> getDefaultSchemas(
|
||||
final List<String> dimNames,
|
||||
final DimensionSchema.MultiValueHandling multiValueHandling
|
||||
)
|
||||
{
|
||||
return Lists.transform(
|
||||
dimNames,
|
||||
|
@ -51,7 +64,7 @@ public class DimensionsSpec
|
|||
@Override
|
||||
public DimensionSchema apply(String input)
|
||||
{
|
||||
return new StringDimensionSchema(input);
|
||||
return new StringDimensionSchema(input, multiValueHandling);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -30,7 +30,7 @@ public class FloatDimensionSchema extends DimensionSchema
|
|||
@JsonProperty("name") String name
|
||||
)
|
||||
{
|
||||
super(name);
|
||||
super(name, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -30,7 +30,7 @@ public class LongDimensionSchema extends DimensionSchema
|
|||
@JsonProperty("name") String name
|
||||
)
|
||||
{
|
||||
super(name);
|
||||
super(name, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,7 @@ public class NewSpatialDimensionSchema extends DimensionSchema
|
|||
@JsonProperty("dims") List<String> dims
|
||||
)
|
||||
{
|
||||
super(name);
|
||||
super(name, null);
|
||||
this.dims = dims;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,16 +26,23 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
public class StringDimensionSchema extends DimensionSchema
|
||||
{
|
||||
@JsonCreator
|
||||
public static StringDimensionSchema create(String name) {
|
||||
public static StringDimensionSchema create(String name)
|
||||
{
|
||||
return new StringDimensionSchema(name);
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
public StringDimensionSchema(
|
||||
@JsonProperty("name") String name
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("multiValueHandling") MultiValueHandling multiValueHandling
|
||||
)
|
||||
{
|
||||
super(name);
|
||||
super(name, multiValueHandling);
|
||||
}
|
||||
|
||||
public StringDimensionSchema(String name)
|
||||
{
|
||||
this(name, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.segment;
|
||||
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
|
@ -27,14 +28,20 @@ public final class DimensionHandlerUtil
|
|||
{
|
||||
private DimensionHandlerUtil() {}
|
||||
|
||||
public static DimensionHandler getHandlerFromCapabilities(String dimensionName, ColumnCapabilities capabilities)
|
||||
public static DimensionHandler getHandlerFromCapabilities(
|
||||
String dimensionName,
|
||||
ColumnCapabilities capabilities,
|
||||
MultiValueHandling multiValueHandling
|
||||
)
|
||||
{
|
||||
DimensionHandler handler = null;
|
||||
if (capabilities.getType() == ValueType.STRING) {
|
||||
if (!capabilities.isDictionaryEncoded() || !capabilities.hasBitmapIndexes()) {
|
||||
throw new IAE("String column must have dictionary encoding and bitmap index.");
|
||||
}
|
||||
handler = new StringDimensionHandler(dimensionName);
|
||||
// use default behavior
|
||||
multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
|
||||
handler = new StringDimensionHandler(dimensionName, multiValueHandling);
|
||||
}
|
||||
if (handler == null) {
|
||||
throw new IAE("Could not create handler from invalid column type: " + capabilities.getType());
|
||||
|
|
|
@ -166,7 +166,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
|
|||
* Get the minimum dimension value seen by this indexer.
|
||||
*
|
||||
* NOTE:
|
||||
* On an in-memory segment (IncrementaIndex), we can determine min/max values by looking at the stream of
|
||||
* On an in-memory segment (IncrementalIndex), we can determine min/max values by looking at the stream of
|
||||
* row values seen in calls to processSingleRowValToIndexKey().
|
||||
*
|
||||
* However, on a disk-backed segment (QueryableIndex), the numeric dimensions do not currently have any
|
||||
|
@ -259,9 +259,11 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
|
|||
*/
|
||||
public int getUnsortedEncodedArrayHashCode(EncodedTypeArray key);
|
||||
|
||||
public static final boolean LIST = true;
|
||||
public static final boolean ARRAY = false;
|
||||
|
||||
/**
|
||||
* Given a row value array from a TimeAndDims key, as described in the documentatiion for compareUnsortedEncodedArrays(),
|
||||
* Given a row value array from a TimeAndDims key, as described in the documentation for compareUnsortedEncodedArrays(),
|
||||
* convert the unsorted encoded values to a list or array of actual values.
|
||||
*
|
||||
* If the key has one element, this method should return a single Object instead of an array or list, ignoring
|
||||
|
@ -275,7 +277,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
|
|||
|
||||
|
||||
/**
|
||||
* Given a row value array from a TimeAndDims key, as described in the documentatiion for compareUnsortedEncodedArrays(),
|
||||
* Given a row value array from a TimeAndDims key, as described in the documentation for compareUnsortedEncodedArrays(),
|
||||
* convert the unsorted encoded values to an array of sorted encoded values (i.e., sorted by their corresponding actual values)
|
||||
*
|
||||
* @param key dimension value array from a TimeAndDims key
|
||||
|
|
|
@ -912,7 +912,7 @@ public class IndexMerger
|
|||
for (int i = 0; i < mergedDimensions.size(); i++) {
|
||||
ColumnCapabilities capabilities = dimCapabilities.get(i);
|
||||
String dimName = mergedDimensions.get(i);
|
||||
handlers[i] = DimensionHandlerUtil.getHandlerFromCapabilities(dimName, capabilities);
|
||||
handlers[i] = DimensionHandlerUtil.getHandlerFromCapabilities(dimName, capabilities, null);
|
||||
}
|
||||
return handlers;
|
||||
}
|
||||
|
|
|
@ -124,7 +124,7 @@ public class SimpleQueryableIndex implements QueryableIndex
|
|||
{
|
||||
for (String dim : availableDimensions) {
|
||||
ColumnCapabilities capabilities = getColumn(dim).getCapabilities();
|
||||
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dim, capabilities);
|
||||
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dim, capabilities, null);
|
||||
dimensionHandlers.put(dim, handler);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,9 +19,8 @@
|
|||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.primitives.Ints;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.DictionaryEncodedColumn;
|
||||
|
@ -37,13 +36,13 @@ import java.util.Comparator;
|
|||
|
||||
public class StringDimensionHandler implements DimensionHandler<Integer, int[], String>
|
||||
{
|
||||
private static final Logger log = new Logger(StringDimensionHandler.class);
|
||||
|
||||
private final String dimensionName;
|
||||
private final MultiValueHandling multiValueHandling;
|
||||
|
||||
public StringDimensionHandler(String dimensionName)
|
||||
public StringDimensionHandler(String dimensionName, MultiValueHandling multiValueHandling)
|
||||
{
|
||||
this.dimensionName = dimensionName;
|
||||
this.multiValueHandling = multiValueHandling;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -193,7 +192,7 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
|
|||
@Override
|
||||
public DimensionIndexer<Integer, int[], String> makeIndexer()
|
||||
{
|
||||
return new StringDimensionIndexer();
|
||||
return new StringDimensionIndexer(multiValueHandling);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -220,21 +219,6 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
|
|||
return new StringDimensionMergerLegacy(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
|
||||
}
|
||||
|
||||
public static final Function<Object, String> STRING_TRANSFORMER = new Function<Object, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(final Object o)
|
||||
{
|
||||
if (o == null) {
|
||||
return null;
|
||||
}
|
||||
if (o instanceof String) {
|
||||
return (String) o;
|
||||
}
|
||||
return o.toString();
|
||||
}
|
||||
};
|
||||
|
||||
public static final Comparator<Integer> ENCODED_COMPARATOR = new Comparator<Integer>()
|
||||
{
|
||||
@Override
|
||||
|
@ -250,18 +234,4 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
|
|||
}
|
||||
};
|
||||
|
||||
public static final Comparator<String> UNENCODED_COMPARATOR = new Comparator<String>()
|
||||
{
|
||||
@Override
|
||||
public int compare(String o1, String o2)
|
||||
{
|
||||
if (o1 == null) {
|
||||
return o2 == null ? 0 : -1;
|
||||
}
|
||||
if (o2 == null) {
|
||||
return 1;
|
||||
}
|
||||
return o1.compareTo(o2);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -26,7 +27,7 @@ import com.google.common.collect.Maps;
|
|||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.collections.bitmap.BitmapFactory;
|
||||
import com.metamx.collections.bitmap.MutableBitmap;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
|
@ -45,13 +46,42 @@ import it.unimi.dsi.fastutil.ints.IntLists;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class StringDimensionIndexer implements DimensionIndexer<Integer, int[], String>
|
||||
{
|
||||
private static final Logger log = new Logger(StringDimensionIndexer.class);
|
||||
public static final Function<Object, String> STRING_TRANSFORMER = new Function<Object, String>()
|
||||
{
|
||||
@Override
|
||||
public String apply(final Object o)
|
||||
{
|
||||
if (o == null) {
|
||||
return null;
|
||||
}
|
||||
if (o instanceof String) {
|
||||
return (String) o;
|
||||
}
|
||||
return o.toString();
|
||||
}
|
||||
};
|
||||
|
||||
public static final Comparator<String> UNENCODED_COMPARATOR = new Comparator<String>()
|
||||
{
|
||||
@Override
|
||||
public int compare(String o1, String o2)
|
||||
{
|
||||
if (o1 == null) {
|
||||
return o2 == null ? 0 : -1;
|
||||
}
|
||||
if (o2 == null) {
|
||||
return 1;
|
||||
}
|
||||
return o1.compareTo(o2);
|
||||
}
|
||||
};
|
||||
|
||||
private static class DimensionDictionary
|
||||
{
|
||||
|
@ -176,12 +206,14 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
}
|
||||
}
|
||||
|
||||
private DimensionDictionary dimLookup;
|
||||
private final DimensionDictionary dimLookup;
|
||||
private final MultiValueHandling multiValueHandling;
|
||||
private SortedDimensionDictionary sortedLookup;
|
||||
|
||||
public StringDimensionIndexer()
|
||||
public StringDimensionIndexer(MultiValueHandling multiValueHandling)
|
||||
{
|
||||
this.dimLookup = new DimensionDictionary();
|
||||
this.multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -195,21 +227,37 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
encodedDimensionValues = null;
|
||||
} else if (dimValues instanceof List) {
|
||||
List<Object> dimValuesList = (List) dimValues;
|
||||
if (dimValuesList.size() == 1) {
|
||||
encodedDimensionValues = new int[]{dimLookup.add(STRING_TRANSFORMER.apply(dimValuesList.get(0)))};
|
||||
} else {
|
||||
final String[] dimensionValues = new String[dimValuesList.size()];
|
||||
for (int i = 0; i < dimValuesList.size(); i++) {
|
||||
dimensionValues[i] = STRING_TRANSFORMER.apply(dimValuesList.get(i));
|
||||
}
|
||||
if (multiValueHandling.needSorting()) {
|
||||
// Sort multival row by their unencoded values first.
|
||||
Arrays.sort(dimensionValues, UNENCODED_COMPARATOR);
|
||||
}
|
||||
|
||||
// Sort multival row by their unencoded values first.
|
||||
final String[] dimensionValues = new String[dimValuesList.size()];
|
||||
for (int i = 0; i < dimValuesList.size(); i++) {
|
||||
dimensionValues[i] = StringDimensionHandler.STRING_TRANSFORMER.apply(dimValuesList.get(i));
|
||||
}
|
||||
Arrays.sort(dimensionValues, StringDimensionHandler.UNENCODED_COMPARATOR);
|
||||
final int[] retVal = new int[dimensionValues.length];
|
||||
|
||||
encodedDimensionValues = new int[dimensionValues.length];
|
||||
for (int i = 0; i < dimensionValues.length; i++) {
|
||||
encodedDimensionValues[i] = dimLookup.add(dimensionValues[i]);
|
||||
int prevId = -1;
|
||||
int pos = 0;
|
||||
for (int i = 0; i < dimensionValues.length; i++) {
|
||||
if (multiValueHandling != MultiValueHandling.SORTED_SET) {
|
||||
retVal[pos++] = dimLookup.add(dimensionValues[i]);
|
||||
continue;
|
||||
}
|
||||
int index = dimLookup.add(dimensionValues[i]);
|
||||
if (index != prevId) {
|
||||
prevId = retVal[pos++] = index;
|
||||
}
|
||||
}
|
||||
|
||||
encodedDimensionValues = pos == retVal.length ? retVal : Arrays.copyOf(retVal, pos);
|
||||
}
|
||||
} else {
|
||||
String transformedVal = StringDimensionHandler.STRING_TRANSFORMER.apply(dimValues);
|
||||
encodedDimensionValues = new int[]{dimLookup.add(transformedVal)};
|
||||
encodedDimensionValues = new int[]{dimLookup.add(STRING_TRANSFORMER.apply(dimValues))};
|
||||
}
|
||||
|
||||
// If dictionary size has changed, the sorted lookup is no longer valid.
|
||||
|
@ -223,21 +271,18 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
@Override
|
||||
public Integer getSortedEncodedValueFromUnsorted(Integer unsortedIntermediateValue)
|
||||
{
|
||||
updateSortedLookup();
|
||||
return sortedLookup.getSortedIdFromUnsortedId(unsortedIntermediateValue);
|
||||
return sortedLookup().getSortedIdFromUnsortedId(unsortedIntermediateValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer getUnsortedEncodedValueFromSorted(Integer sortedIntermediateValue)
|
||||
{
|
||||
updateSortedLookup();
|
||||
return sortedLookup.getUnsortedIdFromSortedId(sortedIntermediateValue);
|
||||
return sortedLookup().getUnsortedIdFromSortedId(sortedIntermediateValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Indexed<String> getSortedIndexedValues()
|
||||
{
|
||||
updateSortedLookup();
|
||||
return new Indexed<String>()
|
||||
{
|
||||
@Override
|
||||
|
@ -491,7 +536,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
final int dimIndex
|
||||
)
|
||||
{
|
||||
final String value = StringDimensionHandler.STRING_TRANSFORMER.apply(matchValue);
|
||||
final String value = STRING_TRANSFORMER.apply(matchValue);
|
||||
final int encodedVal = getEncodedValue(value, false);
|
||||
final boolean matchOnNull = Strings.isNullOrEmpty(value);
|
||||
if (encodedVal < 0 && !matchOnNull) {
|
||||
|
@ -558,18 +603,15 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
};
|
||||
}
|
||||
|
||||
private void updateSortedLookup()
|
||||
private SortedDimensionDictionary sortedLookup()
|
||||
{
|
||||
if (sortedLookup == null) {
|
||||
sortedLookup = dimLookup.sort();
|
||||
}
|
||||
return sortedLookup == null ? sortedLookup = dimLookup.sort() : sortedLookup;
|
||||
}
|
||||
|
||||
private String getActualValue(int intermediateValue, boolean idSorted)
|
||||
{
|
||||
if (idSorted) {
|
||||
updateSortedLookup();
|
||||
return sortedLookup.getValueFromSortedId(intermediateValue);
|
||||
return sortedLookup().getValueFromSortedId(intermediateValue);
|
||||
} else {
|
||||
return dimLookup.getValue(intermediateValue);
|
||||
|
||||
|
@ -581,8 +623,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
int unsortedId = dimLookup.getId(fullValue);
|
||||
|
||||
if (idSorted) {
|
||||
updateSortedLookup();
|
||||
return sortedLookup.getSortedIdFromUnsortedId(unsortedId);
|
||||
return sortedLookup().getSortedIdFromUnsortedId(unsortedId);
|
||||
} else {
|
||||
return unsortedId;
|
||||
}
|
||||
|
|
|
@ -409,7 +409,11 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
if (dimSchema.getTypeName().equals(DimensionSchema.SPATIAL_TYPE_NAME)) {
|
||||
capabilities.setHasSpatialIndexes(true);
|
||||
} else {
|
||||
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dimName, capabilities);
|
||||
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(
|
||||
dimName,
|
||||
capabilities,
|
||||
dimSchema.getMultiValueHandling()
|
||||
);
|
||||
addNewDimension(dimName, capabilities, handler);
|
||||
}
|
||||
columnCapabilities.put(dimName, capabilities);
|
||||
|
@ -556,7 +560,6 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
for (String dimension : rowDimensions) {
|
||||
boolean wasNewDim = false;
|
||||
ColumnCapabilitiesImpl capabilities;
|
||||
ValueType valType = null;
|
||||
DimensionDesc desc = dimensionDescs.get(dimension);
|
||||
if (desc != null) {
|
||||
capabilities = desc.getCapabilities();
|
||||
|
@ -571,7 +574,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
capabilities.setHasBitmapIndexes(true);
|
||||
columnCapabilities.put(dimension, capabilities);
|
||||
}
|
||||
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dimension, capabilities);
|
||||
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dimension, capabilities, null);
|
||||
desc = addNewDimension(dimension, capabilities, handler);
|
||||
}
|
||||
DimensionHandler handler = desc.getHandler();
|
||||
|
@ -751,7 +754,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
if (dimensionDescs.get(dim) == null) {
|
||||
ColumnCapabilitiesImpl capabilities = oldColumnCapabilities.get(dim);
|
||||
columnCapabilities.put(dim, capabilities);
|
||||
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dim, capabilities);
|
||||
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dim, capabilities, null);
|
||||
addNewDimension(dim, capabilities, handler);
|
||||
}
|
||||
}
|
||||
|
@ -852,7 +855,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
continue;
|
||||
}
|
||||
final DimensionIndexer indexer = dimensionDesc.getIndexer();
|
||||
Object rowVals = indexer.convertUnsortedEncodedArrayToActualArrayOrList(dim, true);
|
||||
Object rowVals = indexer.convertUnsortedEncodedArrayToActualArrayOrList(dim, DimensionIndexer.LIST);
|
||||
theVals.put(dimensionName, rowVals);
|
||||
}
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ public class IncrementalIndexSchema
|
|||
|
||||
public Builder withDimensionsSpec(DimensionsSpec dimensionsSpec)
|
||||
{
|
||||
this.dimensionsSpec = dimensionsSpec;
|
||||
this.dimensionsSpec = dimensionsSpec == null ? DimensionsSpec.ofEmpty() : dimensionsSpec;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -530,8 +530,9 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
return null;
|
||||
}
|
||||
|
||||
Object dimVals = indexer.convertUnsortedEncodedArrayToActualArrayOrList(dims[dimensionIndex], false);
|
||||
return dimVals;
|
||||
return indexer.convertUnsortedEncodedArrayToActualArrayOrList(
|
||||
dims[dimensionIndex], DimensionIndexer.ARRAY
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.segment.incremental;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.java.util.common.parsers.ParseException;
|
||||
|
@ -97,13 +98,15 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
long minTimestamp,
|
||||
QueryGranularity gran,
|
||||
boolean rollup,
|
||||
final AggregatorFactory[] metrics,
|
||||
DimensionsSpec dimensionsSpec,
|
||||
AggregatorFactory[] metrics,
|
||||
int maxRowCount
|
||||
)
|
||||
{
|
||||
this(
|
||||
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
|
||||
.withQueryGranularity(gran)
|
||||
.withDimensionsSpec(dimensionsSpec)
|
||||
.withMetrics(metrics)
|
||||
.withRollup(rollup)
|
||||
.build(),
|
||||
|
@ -125,6 +128,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
minTimestamp,
|
||||
gran,
|
||||
IncrementalIndexSchema.DEFAULT_ROLLUP,
|
||||
null,
|
||||
metrics,
|
||||
maxRowCount
|
||||
);
|
||||
|
|
|
@ -28,7 +28,10 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.metamx.collections.bitmap.RoaringBitmapFactory;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.impl.DimensionSchema;
|
||||
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.java.util.common.IAE;
|
||||
|
@ -39,6 +42,7 @@ import io.druid.query.aggregation.CountAggregatorFactory;
|
|||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.column.DictionaryEncodedColumn;
|
||||
import io.druid.segment.column.SimpleDictionaryEncodedColumn;
|
||||
import io.druid.segment.data.BitmapSerdeFactory;
|
||||
import io.druid.segment.data.CompressedObjectStrategy;
|
||||
|
@ -795,11 +799,21 @@ public class IndexMergerTest
|
|||
return;
|
||||
}
|
||||
|
||||
Object encodedColumn = index.getColumn("dim2").getDictionaryEncoding();
|
||||
Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column");
|
||||
field.setAccessible(true);
|
||||
DictionaryEncodedColumn encodedColumn = index.getColumn("dim2").getDictionaryEncoding();
|
||||
Object obj;
|
||||
if (encodedColumn.hasMultipleValues()) {
|
||||
Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("multiValueColumn");
|
||||
field.setAccessible(true);
|
||||
|
||||
Object obj = field.get(encodedColumn);
|
||||
obj = field.get(encodedColumn);
|
||||
} else {
|
||||
Field field = SimpleDictionaryEncodedColumn.class.getDeclaredField("column");
|
||||
field.setAccessible(true);
|
||||
|
||||
obj = field.get(encodedColumn);
|
||||
}
|
||||
// CompressedVSizeIntsIndexedSupplier$CompressedByteSizeIndexedInts
|
||||
// CompressedVSizeIndexedSupplier$CompressedVSizeIndexed
|
||||
Field compressedSupplierField = obj.getClass().getDeclaredField("this$0");
|
||||
compressedSupplierField.setAccessible(true);
|
||||
|
||||
|
@ -1716,11 +1730,13 @@ public class IndexMergerTest
|
|||
IncrementalIndex index1 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("A", "A")
|
||||
});
|
||||
index1.add(new MapBasedInputRow(
|
||||
1L,
|
||||
Lists.newArrayList("d1", "d2"),
|
||||
ImmutableMap.<String, Object>of("d1", "a", "d2", "z", "A", 1)
|
||||
));
|
||||
index1.add(
|
||||
new MapBasedInputRow(
|
||||
1L,
|
||||
Lists.newArrayList("d1", "d2"),
|
||||
ImmutableMap.<String, Object>of("d1", "a", "d2", "z", "A", 1)
|
||||
)
|
||||
);
|
||||
closer.closeLater(index1);
|
||||
|
||||
IncrementalIndex index2 = IncrementalIndexTest.createIndex(new AggregatorFactory[]{
|
||||
|
@ -2170,4 +2186,113 @@ public class IndexMergerTest
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiValueHandling() throws Exception
|
||||
{
|
||||
InputRow[] rows = new InputRow[]{
|
||||
new MapBasedInputRow(
|
||||
1,
|
||||
Arrays.asList("dim1", "dim2"),
|
||||
ImmutableMap.<String, Object>of(
|
||||
"dim1", Arrays.asList("x", "a", "a", "b"),
|
||||
"dim2", Arrays.asList("a", "x", "b", "x")
|
||||
)
|
||||
),
|
||||
new MapBasedInputRow(
|
||||
1,
|
||||
Arrays.asList("dim1", "dim2"),
|
||||
ImmutableMap.<String, Object>of(
|
||||
"dim1", Arrays.asList("a", "b", "x"),
|
||||
"dim2", Arrays.asList("x", "a", "b")
|
||||
)
|
||||
)
|
||||
};
|
||||
|
||||
List<DimensionSchema> schema;
|
||||
QueryableIndex index;
|
||||
QueryableIndexIndexableAdapter adapter;
|
||||
List<Rowboat> boatList;
|
||||
|
||||
// xaab-axbx + abx-xab --> aabx-abxx + abx-abx --> abx-abx + aabx-abxx
|
||||
schema = DimensionsSpec.getDefaultSchemas(Arrays.asList("dim1", "dim2"), MultiValueHandling.SORTED_ARRAY);
|
||||
index = persistAndLoad(schema, rows);
|
||||
adapter = new QueryableIndexIndexableAdapter(index);
|
||||
boatList = ImmutableList.copyOf(adapter.getRows());
|
||||
|
||||
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index.getColumnNames().size());
|
||||
|
||||
Assert.assertEquals(2, boatList.size());
|
||||
Assert.assertArrayEquals(new int[][]{{0, 1, 2}, {0, 1, 2}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{0, 0, 1, 2}, {0, 1, 2, 2}}, boatList.get(1).getDims());
|
||||
|
||||
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", ""));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim1", "a"));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim1", "b"));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim1", "x"));
|
||||
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim2", "a"));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim2", "b"));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim2", "x"));
|
||||
|
||||
// xaab-axbx + abx-xab --> abx-abx + abx-abx --> abx-abx
|
||||
schema = DimensionsSpec.getDefaultSchemas(Arrays.asList("dim1", "dim2"), MultiValueHandling.SORTED_SET);
|
||||
index = persistAndLoad(schema, rows);
|
||||
|
||||
Assert.assertEquals(1, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index.getColumnNames().size());
|
||||
|
||||
adapter = new QueryableIndexIndexableAdapter(index);
|
||||
boatList = ImmutableList.copyOf(adapter.getRows());
|
||||
|
||||
Assert.assertEquals(1, boatList.size());
|
||||
Assert.assertArrayEquals(new int[][]{{0, 1, 2}, {0, 1, 2}}, boatList.get(0).getDims());
|
||||
|
||||
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", ""));
|
||||
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "a"));
|
||||
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "b"));
|
||||
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "x"));
|
||||
|
||||
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim2", "a"));
|
||||
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim2", "b"));
|
||||
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim2", "x"));
|
||||
|
||||
// xaab-axbx + abx-xab --> abx-xab + xaab-axbx
|
||||
schema = DimensionsSpec.getDefaultSchemas(Arrays.asList("dim1", "dim2"), MultiValueHandling.ARRAY);
|
||||
index = persistAndLoad(schema, rows);
|
||||
|
||||
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
|
||||
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
|
||||
Assert.assertEquals(3, index.getColumnNames().size());
|
||||
|
||||
adapter = new QueryableIndexIndexableAdapter(index);
|
||||
boatList = ImmutableList.copyOf(adapter.getRows());
|
||||
|
||||
Assert.assertEquals(2, boatList.size());
|
||||
Assert.assertArrayEquals(new int[][]{{0, 1, 2}, {2, 0, 1}}, boatList.get(0).getDims());
|
||||
Assert.assertArrayEquals(new int[][]{{2, 0, 0, 1}, {0, 2, 1, 2}}, boatList.get(1).getDims());
|
||||
|
||||
checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", ""));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim1", "a"));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim1", "b"));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim1", "x"));
|
||||
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim2", "a"));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim2", "b"));
|
||||
checkBitmapIndex(Lists.newArrayList(0, 1), adapter.getBitmapIndex("dim2", "x"));
|
||||
}
|
||||
|
||||
private QueryableIndex persistAndLoad(List<DimensionSchema> schema, InputRow... rows) throws IOException
|
||||
{
|
||||
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null, new DimensionsSpec(schema, null, null));
|
||||
for (InputRow row : rows) {
|
||||
toPersist.add(row);
|
||||
}
|
||||
|
||||
final File tempDir = temporaryFolder.newFolder();
|
||||
return closer.closeLater(INDEX_IO.loadIndex(INDEX_MERGER.persist(toPersist, tempDir, indexSpec)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -196,6 +196,19 @@ public class IncrementalIndexTest
|
|||
return defaultCombiningAggregatorFactories;
|
||||
}
|
||||
|
||||
public static IncrementalIndex createIndex(
|
||||
AggregatorFactory[] aggregatorFactories,
|
||||
DimensionsSpec dimensionsSpec)
|
||||
{
|
||||
if (null == aggregatorFactories) {
|
||||
aggregatorFactories = defaultAggregatorFactories;
|
||||
}
|
||||
|
||||
return new OnheapIncrementalIndex(
|
||||
0L, QueryGranularities.NONE, true, dimensionsSpec, aggregatorFactories, 1000000
|
||||
);
|
||||
}
|
||||
|
||||
public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories)
|
||||
{
|
||||
if (null == aggregatorFactories) {
|
||||
|
@ -203,7 +216,7 @@ public class IncrementalIndexTest
|
|||
}
|
||||
|
||||
return new OnheapIncrementalIndex(
|
||||
0L, QueryGranularities.NONE, aggregatorFactories, 1000000
|
||||
0L, QueryGranularities.NONE, true, null, aggregatorFactories, 1000000
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -214,7 +227,7 @@ public class IncrementalIndexTest
|
|||
}
|
||||
|
||||
return new OnheapIncrementalIndex(
|
||||
0L, QueryGranularities.NONE, false, aggregatorFactories, 1000000
|
||||
0L, QueryGranularities.NONE, false, null, aggregatorFactories, 1000000
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ public class RowboatTest
|
|||
private static DimensionHandler[] getDefaultHandlers(int size) {
|
||||
DimensionHandler[] handlers = new DimensionHandler[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
handlers[i] = new StringDimensionHandler(String.valueOf(i));
|
||||
handlers[i] = new StringDimensionHandler(String.valueOf(i), null);
|
||||
}
|
||||
return handlers;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.incremental;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.data.input.impl.DimensionSchema;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.data.input.impl.StringDimensionSchema;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class IncrementalIndexMultiValueSpecTest
|
||||
{
|
||||
@Test
|
||||
public void test() throws IndexSizeExceededException
|
||||
{
|
||||
DimensionsSpec dimensionsSpec = new DimensionsSpec(
|
||||
Arrays.<DimensionSchema>asList(
|
||||
new StringDimensionSchema("string1", DimensionSchema.MultiValueHandling.ARRAY),
|
||||
new StringDimensionSchema("string2", DimensionSchema.MultiValueHandling.SORTED_ARRAY),
|
||||
new StringDimensionSchema("string3", DimensionSchema.MultiValueHandling.SORTED_SET)
|
||||
),
|
||||
null, null
|
||||
);
|
||||
IncrementalIndexSchema schema = new IncrementalIndexSchema(
|
||||
0,
|
||||
new TimestampSpec("ds", "auto", null),
|
||||
QueryGranularities.ALL,
|
||||
dimensionsSpec,
|
||||
new AggregatorFactory[0],
|
||||
false
|
||||
);
|
||||
Map<String, Object> map = new HashMap<String, Object>()
|
||||
{
|
||||
@Override
|
||||
public Object get(Object key)
|
||||
{
|
||||
if (((String) key).startsWith("string")) {
|
||||
return Arrays.asList("xsd", "aba", "fds", "aba");
|
||||
}
|
||||
if (((String) key).startsWith("float")) {
|
||||
return Arrays.<Float>asList(3.92f, -2.76f, 42.153f, Float.NaN, -2.76f, -2.76f);
|
||||
}
|
||||
if (((String) key).startsWith("long")) {
|
||||
return Arrays.<Long>asList(-231238789L, 328L, 923L, 328L, -2L, 0L);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
IncrementalIndex<?> index = new OnheapIncrementalIndex(schema, true, 10000);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
0, Arrays.asList(
|
||||
"string1", "string2", "string3", "float1", "float2", "float3", "long1", "long2", "long3"
|
||||
), map
|
||||
)
|
||||
);
|
||||
|
||||
Row row = index.iterator().next();
|
||||
Assert.assertEquals(Lists.newArrayList("xsd", "aba", "fds", "aba"), row.getRaw("string1"));
|
||||
Assert.assertEquals(Lists.newArrayList("aba", "aba", "fds", "xsd"), row.getRaw("string2"));
|
||||
Assert.assertEquals(Lists.newArrayList("aba", "fds", "xsd"), row.getRaw("string3"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue