mirror of https://github.com/apache/druid.git
Merge pull request #495 from metamx/dim-extraction-null-values
Support null values in DimExtractionFn in topN and groupBy
This commit is contained in:
commit
76437379d7
2
pom.xml
2
pom.xml
|
@ -41,7 +41,7 @@
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
<metamx.java-util.version>0.25.5</metamx.java-util.version>
|
<metamx.java-util.version>0.25.5</metamx.java-util.version>
|
||||||
<apache.curator.version>2.4.0</apache.curator.version>
|
<apache.curator.version>2.4.0</apache.curator.version>
|
||||||
<druid.api.version>0.1.11</druid.api.version>
|
<druid.api.version>0.1.12</druid.api.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<modules>
|
<modules>
|
||||||
|
|
|
@ -96,10 +96,7 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
|
||||||
Aggregator[] theAggregators = rowSelector[dimIndex];
|
Aggregator[] theAggregators = rowSelector[dimIndex];
|
||||||
if (theAggregators == null) {
|
if (theAggregators == null) {
|
||||||
String key = query.getDimensionSpec().getDimExtractionFn().apply(dimSelector.lookupName(dimIndex));
|
String key = query.getDimensionSpec().getDimExtractionFn().apply(dimSelector.lookupName(dimIndex));
|
||||||
if (key == null) {
|
// null keys are allowed
|
||||||
rowSelector[dimIndex] = EMPTY_ARRAY;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
theAggregators = aggregatesStore.get(key);
|
theAggregators = aggregatesStore.get(key);
|
||||||
if (theAggregators == null) {
|
if (theAggregators == null) {
|
||||||
theAggregators = makeAggregators(cursor, query.getAggregatorSpecs());
|
theAggregators = makeAggregators(cursor, query.getAggregatorSpecs());
|
||||||
|
|
|
@ -281,7 +281,6 @@ public class IncrementalIndex implements Iterable<Row>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,7 +329,7 @@ public class IncrementalIndex implements Iterable<Row>
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (String dimValue : dimValues) {
|
for (String dimValue : dimValues) {
|
||||||
String canonicalDimValue = dimLookup.get(dimValue);
|
String canonicalDimValue = dimLookup.get(dimValue);
|
||||||
if (canonicalDimValue == null) {
|
if (canonicalDimValue == null && !dimLookup.contains(dimValue)) {
|
||||||
canonicalDimValue = dimValue;
|
canonicalDimValue = dimValue;
|
||||||
dimLookup.add(dimValue);
|
dimLookup.add(dimValue);
|
||||||
}
|
}
|
||||||
|
@ -560,7 +559,17 @@ public class IncrementalIndex implements Iterable<Row>
|
||||||
|
|
||||||
int valsIndex = 0;
|
int valsIndex = 0;
|
||||||
while (retVal == 0 && valsIndex < lhsVals.length) {
|
while (retVal == 0 && valsIndex < lhsVals.length) {
|
||||||
retVal = lhsVals[valsIndex].compareTo(rhsVals[valsIndex]);
|
final String lhsVal = lhsVals[valsIndex];
|
||||||
|
final String rhsVal = rhsVals[valsIndex];
|
||||||
|
if (lhsVal == null && rhsVal == null) {
|
||||||
|
return 0;
|
||||||
|
} else if (lhsVal == null) {
|
||||||
|
return -1;
|
||||||
|
} else if (rhsVal == null) {
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
retVal = lhsVal.compareTo(rhsVal);
|
||||||
|
}
|
||||||
++valsIndex;
|
++valsIndex;
|
||||||
}
|
}
|
||||||
++index;
|
++index;
|
||||||
|
@ -576,16 +585,16 @@ public class IncrementalIndex implements Iterable<Row>
|
||||||
"timestamp=" + new DateTime(timestamp) +
|
"timestamp=" + new DateTime(timestamp) +
|
||||||
", dims=" + Lists.transform(
|
", dims=" + Lists.transform(
|
||||||
Arrays.asList(dims), new Function<String[], Object>()
|
Arrays.asList(dims), new Function<String[], Object>()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public Object apply(@Nullable String[] input)
|
public Object apply(@Nullable String[] input)
|
||||||
{
|
{
|
||||||
if (input == null || input.length == 0) {
|
if (input == null || input.length == 0) {
|
||||||
return Arrays.asList("null");
|
return Arrays.asList("null");
|
||||||
|
}
|
||||||
|
return Arrays.asList(input);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return Arrays.asList(input);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
) +
|
) +
|
||||||
'}';
|
'}';
|
||||||
}
|
}
|
||||||
|
@ -593,6 +602,7 @@ public class IncrementalIndex implements Iterable<Row>
|
||||||
|
|
||||||
static class DimDim
|
static class DimDim
|
||||||
{
|
{
|
||||||
|
public static final String NULL_STRING = "\u0000";
|
||||||
private final Map<String, String> poorMansInterning = Maps.newConcurrentMap();
|
private final Map<String, String> poorMansInterning = Maps.newConcurrentMap();
|
||||||
private final Map<String, Integer> falseIds;
|
private final Map<String, Integer> falseIds;
|
||||||
private final Map<Integer, String> falseIdsReverse;
|
private final Map<Integer, String> falseIdsReverse;
|
||||||
|
@ -605,19 +615,32 @@ public class IncrementalIndex implements Iterable<Row>
|
||||||
falseIdsReverse = biMap.inverse();
|
falseIdsReverse = biMap.inverse();
|
||||||
}
|
}
|
||||||
|
|
||||||
public String get(String value)
|
public boolean contains(@Nullable String value)
|
||||||
{
|
{
|
||||||
return value == null ? null : poorMansInterning.get(value);
|
return poorMansInterning.containsKey(value == null ? NULL_STRING : value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getId(String value)
|
public String get(@Nullable String value)
|
||||||
{
|
{
|
||||||
return falseIds.get(value);
|
final String retVal;
|
||||||
|
if (value == null) {
|
||||||
|
retVal = poorMansInterning.get(NULL_STRING);
|
||||||
|
} else {
|
||||||
|
retVal = poorMansInterning.get(value);
|
||||||
|
}
|
||||||
|
return retVal == null ? null : (retVal.equals(NULL_STRING) ? null : retVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getId(@Nullable String value)
|
||||||
|
{
|
||||||
|
return value == null ? falseIds.get(NULL_STRING) : falseIds.get(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
public String getValue(int id)
|
public String getValue(int id)
|
||||||
{
|
{
|
||||||
return falseIdsReverse.get(id);
|
final String value = falseIdsReverse.get(id);
|
||||||
|
return value.equals(NULL_STRING) ? null : value;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int size()
|
public int size()
|
||||||
|
@ -625,27 +648,30 @@ public class IncrementalIndex implements Iterable<Row>
|
||||||
return poorMansInterning.size();
|
return poorMansInterning.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<String> keySet()
|
public synchronized void add(@Nullable String value)
|
||||||
{
|
|
||||||
return poorMansInterning.keySet();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void add(String value)
|
|
||||||
{
|
{
|
||||||
|
if (value == null) {
|
||||||
|
value = NULL_STRING;
|
||||||
|
}
|
||||||
poorMansInterning.put(value, value);
|
poorMansInterning.put(value, value);
|
||||||
falseIds.put(value, falseIds.size());
|
falseIds.put(value, falseIds.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getSortedId(String value)
|
public int getSortedId(@Nullable String value)
|
||||||
{
|
{
|
||||||
assertSorted();
|
assertSorted();
|
||||||
|
if (value == null) {
|
||||||
|
value = NULL_STRING;
|
||||||
|
}
|
||||||
return Arrays.binarySearch(sortedVals, value);
|
return Arrays.binarySearch(sortedVals, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
public String getSortedValue(int index)
|
public String getSortedValue(int index)
|
||||||
{
|
{
|
||||||
assertSorted();
|
assertSorted();
|
||||||
return sortedVals[index];
|
final String sortedVal = sortedVals[index];
|
||||||
|
return sortedVal.equals(NULL_STRING) ? null : sortedVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sort()
|
public void sort()
|
||||||
|
|
|
@ -189,7 +189,7 @@ public class SpatialDimensionRowFormatter
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
for (String dimVal : dimVals) {
|
for (String dimVal : dimVals) {
|
||||||
if (Floats.tryParse(dimVal) == null) {
|
if (dimVal == null || Floats.tryParse(dimVal) == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ import io.druid.query.aggregation.MaxAggregatorFactory;
|
||||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||||
import io.druid.query.dimension.DimensionSpec;
|
import io.druid.query.dimension.DimensionSpec;
|
||||||
import io.druid.query.dimension.ExtractionDimensionSpec;
|
import io.druid.query.dimension.ExtractionDimensionSpec;
|
||||||
|
import io.druid.query.extraction.DimExtractionFn;
|
||||||
import io.druid.query.extraction.RegexDimExtractionFn;
|
import io.druid.query.extraction.RegexDimExtractionFn;
|
||||||
import io.druid.query.filter.JavaScriptDimFilter;
|
import io.druid.query.filter.JavaScriptDimFilter;
|
||||||
import io.druid.query.filter.RegexDimFilter;
|
import io.druid.query.filter.RegexDimFilter;
|
||||||
|
@ -210,9 +211,11 @@ public class GroupByQueryRunnerTest
|
||||||
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
TestHelper.assertExpectedObjects(expectedResults, results, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGroupByWithDimExtractionFn()
|
public void testGroupByWithDimExtractionFn()
|
||||||
{
|
{
|
||||||
|
final DimExtractionFn fn1 = new RegexDimExtractionFn("(\\w{1})");
|
||||||
GroupByQuery query = GroupByQuery
|
GroupByQuery query = GroupByQuery
|
||||||
.builder()
|
.builder()
|
||||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
@ -222,7 +225,20 @@ public class GroupByQueryRunnerTest
|
||||||
new ExtractionDimensionSpec(
|
new ExtractionDimensionSpec(
|
||||||
"quality",
|
"quality",
|
||||||
"alias",
|
"alias",
|
||||||
new RegexDimExtractionFn("(\\w{1})")
|
new DimExtractionFn()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public byte[] getCacheKey()
|
||||||
|
{
|
||||||
|
return new byte[]{(byte)0xFF};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String apply(String dimValue)
|
||||||
|
{
|
||||||
|
return dimValue.equals("mezzanine") ? null : fn1.apply(dimValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -236,20 +252,20 @@ public class GroupByQueryRunnerTest
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
List<Row> expectedResults = Arrays.asList(
|
List<Row> expectedResults = Arrays.asList(
|
||||||
|
createExpectedRow("2011-04-01", "alias", null, "rows", 3L, "idx", 2870L),
|
||||||
createExpectedRow("2011-04-01", "alias", "a", "rows", 1L, "idx", 135L),
|
createExpectedRow("2011-04-01", "alias", "a", "rows", 1L, "idx", 135L),
|
||||||
createExpectedRow("2011-04-01", "alias", "b", "rows", 1L, "idx", 118L),
|
createExpectedRow("2011-04-01", "alias", "b", "rows", 1L, "idx", 118L),
|
||||||
createExpectedRow("2011-04-01", "alias", "e", "rows", 1L, "idx", 158L),
|
createExpectedRow("2011-04-01", "alias", "e", "rows", 1L, "idx", 158L),
|
||||||
createExpectedRow("2011-04-01", "alias", "h", "rows", 1L, "idx", 120L),
|
createExpectedRow("2011-04-01", "alias", "h", "rows", 1L, "idx", 120L),
|
||||||
createExpectedRow("2011-04-01", "alias", "m", "rows", 3L, "idx", 2870L),
|
|
||||||
createExpectedRow("2011-04-01", "alias", "n", "rows", 1L, "idx", 121L),
|
createExpectedRow("2011-04-01", "alias", "n", "rows", 1L, "idx", 121L),
|
||||||
createExpectedRow("2011-04-01", "alias", "p", "rows", 3L, "idx", 2900L),
|
createExpectedRow("2011-04-01", "alias", "p", "rows", 3L, "idx", 2900L),
|
||||||
createExpectedRow("2011-04-01", "alias", "t", "rows", 2L, "idx", 197L),
|
createExpectedRow("2011-04-01", "alias", "t", "rows", 2L, "idx", 197L),
|
||||||
|
|
||||||
|
createExpectedRow("2011-04-02", "alias", null, "rows", 3L, "idx", 2447L),
|
||||||
createExpectedRow("2011-04-02", "alias", "a", "rows", 1L, "idx", 147L),
|
createExpectedRow("2011-04-02", "alias", "a", "rows", 1L, "idx", 147L),
|
||||||
createExpectedRow("2011-04-02", "alias", "b", "rows", 1L, "idx", 112L),
|
createExpectedRow("2011-04-02", "alias", "b", "rows", 1L, "idx", 112L),
|
||||||
createExpectedRow("2011-04-02", "alias", "e", "rows", 1L, "idx", 166L),
|
createExpectedRow("2011-04-02", "alias", "e", "rows", 1L, "idx", 166L),
|
||||||
createExpectedRow("2011-04-02", "alias", "h", "rows", 1L, "idx", 113L),
|
createExpectedRow("2011-04-02", "alias", "h", "rows", 1L, "idx", 113L),
|
||||||
createExpectedRow("2011-04-02", "alias", "m", "rows", 3L, "idx", 2447L),
|
|
||||||
createExpectedRow("2011-04-02", "alias", "n", "rows", 1L, "idx", 114L),
|
createExpectedRow("2011-04-02", "alias", "n", "rows", 1L, "idx", 114L),
|
||||||
createExpectedRow("2011-04-02", "alias", "p", "rows", 3L, "idx", 2505L),
|
createExpectedRow("2011-04-02", "alias", "p", "rows", 3L, "idx", 2505L),
|
||||||
createExpectedRow("2011-04-02", "alias", "t", "rows", 2L, "idx", 223L)
|
createExpectedRow("2011-04-02", "alias", "t", "rows", 2L, "idx", 223L)
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import io.druid.collections.StupidPool;
|
import io.druid.collections.StupidPool;
|
||||||
import io.druid.query.BySegmentResultValueClass;
|
import io.druid.query.BySegmentResultValueClass;
|
||||||
|
@ -36,6 +37,7 @@ import io.druid.query.aggregation.MaxAggregatorFactory;
|
||||||
import io.druid.query.aggregation.MinAggregatorFactory;
|
import io.druid.query.aggregation.MinAggregatorFactory;
|
||||||
import io.druid.query.aggregation.PostAggregator;
|
import io.druid.query.aggregation.PostAggregator;
|
||||||
import io.druid.query.dimension.ExtractionDimensionSpec;
|
import io.druid.query.dimension.ExtractionDimensionSpec;
|
||||||
|
import io.druid.query.extraction.DimExtractionFn;
|
||||||
import io.druid.query.extraction.RegexDimExtractionFn;
|
import io.druid.query.extraction.RegexDimExtractionFn;
|
||||||
import io.druid.query.filter.AndDimFilter;
|
import io.druid.query.filter.AndDimFilter;
|
||||||
import io.druid.query.filter.DimFilter;
|
import io.druid.query.filter.DimFilter;
|
||||||
|
@ -52,6 +54,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -1129,6 +1132,74 @@ public class TopNQueryRunnerTest
|
||||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTopNDimExtractionNulls()
|
||||||
|
{
|
||||||
|
TopNQuery query = new TopNQueryBuilder()
|
||||||
|
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.granularity(QueryRunnerTestHelper.allGran)
|
||||||
|
.dimension(
|
||||||
|
new ExtractionDimensionSpec(
|
||||||
|
providerDimension, providerDimension, new DimExtractionFn()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public byte[] getCacheKey()
|
||||||
|
{
|
||||||
|
return new byte[]{(byte)0xFF};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String apply(String dimValue)
|
||||||
|
{
|
||||||
|
return dimValue.equals("total_market") ? null : dimValue;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
.metric("rows")
|
||||||
|
.threshold(4)
|
||||||
|
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||||
|
.aggregators(QueryRunnerTestHelper.commonAggregators)
|
||||||
|
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Map<String, Object> nullValue = Maps.newHashMap();
|
||||||
|
nullValue.put(providerDimension, null);
|
||||||
|
nullValue.putAll(ImmutableMap.of(
|
||||||
|
"rows", 4L,
|
||||||
|
"index", 5351.814697265625D,
|
||||||
|
"addRowsIndexConstant", 5356.814697265625D,
|
||||||
|
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||||
|
));
|
||||||
|
|
||||||
|
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||||
|
new Result<>(
|
||||||
|
new DateTime("2011-04-01T00:00:00.000Z"),
|
||||||
|
new TopNResultValue(
|
||||||
|
Arrays.<Map<String, Object>>asList(
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
providerDimension, "spot",
|
||||||
|
"rows", 18L,
|
||||||
|
"index", 2231.8768157958984D,
|
||||||
|
"addRowsIndexConstant", 2250.8768157958984D,
|
||||||
|
"uniques", QueryRunnerTestHelper.UNIQUES_9
|
||||||
|
),
|
||||||
|
nullValue
|
||||||
|
,
|
||||||
|
ImmutableMap.<String, Object>of(
|
||||||
|
providerDimension, "upfront",
|
||||||
|
"rows", 4L,
|
||||||
|
"index", 4875.669677734375D,
|
||||||
|
"addRowsIndexConstant", 4880.669677734375D,
|
||||||
|
"uniques", QueryRunnerTestHelper.UNIQUES_2
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvertedTopNQuery()
|
public void testInvertedTopNQuery()
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue