mirror of https://github.com/apache/druid.git
fix group-by v2 BufferArrayGrouper for empty multi-value dimension row (#7794)
* fix groupby v2 BufferArrayGrouper * better name test * fix sql compatible null handling array grouper bug * another test
This commit is contained in:
parent
58a571ccda
commit
aaefdb3386
|
@ -233,48 +233,33 @@ public class BufferArrayGrouper implements IntGrouper
|
||||||
|
|
||||||
return new CloseableIterator<Entry<Integer>>()
|
return new CloseableIterator<Entry<Integer>>()
|
||||||
{
|
{
|
||||||
int cur;
|
// initialize to the first used slot
|
||||||
boolean findNext = false;
|
private int next = findNext(-1);
|
||||||
|
|
||||||
{
|
|
||||||
cur = findNext();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext()
|
public boolean hasNext()
|
||||||
{
|
{
|
||||||
if (findNext) {
|
return next >= 0;
|
||||||
cur = findNext();
|
|
||||||
findNext = false;
|
|
||||||
}
|
|
||||||
return cur >= 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
private int findNext()
|
|
||||||
{
|
|
||||||
for (int i = cur + 1; i < cardinalityWithMissingValue; i++) {
|
|
||||||
if (isUsedSlot(i)) {
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Entry<Integer> next()
|
public Entry<Integer> next()
|
||||||
{
|
{
|
||||||
if (cur < 0) {
|
if (next < 0) {
|
||||||
throw new NoSuchElementException();
|
throw new NoSuchElementException();
|
||||||
}
|
}
|
||||||
|
|
||||||
findNext = true;
|
final int current = next;
|
||||||
|
next = findNext(current);
|
||||||
|
|
||||||
final Object[] values = new Object[aggregators.length];
|
final Object[] values = new Object[aggregators.length];
|
||||||
final int recordOffset = cur * recordSize;
|
final int recordOffset = current * recordSize;
|
||||||
for (int i = 0; i < aggregators.length; i++) {
|
for (int i = 0; i < aggregators.length; i++) {
|
||||||
values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]);
|
values[i] = aggregators[i].get(valBuffer, recordOffset + aggregatorOffsets[i]);
|
||||||
}
|
}
|
||||||
return new Entry<>(cur - 1, values);
|
// shift by -1 since values are initially shifted by +1 so they are all positive and
|
||||||
|
// GroupByColumnSelectorStrategy.GROUP_BY_MISSING_VALUE is -1
|
||||||
|
return new Entry<>(current - 1, values);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -282,6 +267,18 @@ public class BufferArrayGrouper implements IntGrouper
|
||||||
{
|
{
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int findNext(int current)
|
||||||
|
{
|
||||||
|
// shift by +1 since we're looking for the next used slot after the current position
|
||||||
|
for (int i = current + 1; i < cardinalityWithMissingValue; i++) {
|
||||||
|
if (isUsedSlot(i)) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// no more slots
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -359,7 +359,7 @@ public class GroupByQueryEngineV2
|
||||||
delegate.close();
|
delegate.close();
|
||||||
}
|
}
|
||||||
delegate = initNewDelegate();
|
delegate = initNewDelegate();
|
||||||
return true;
|
return delegate.hasNext();
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -681,7 +681,7 @@ public class GroupByQueryEngineV2
|
||||||
((DimensionSelector) dim.getSelector()).lookupName(key)
|
((DimensionSelector) dim.getSelector()).lookupName(key)
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
map.put(dim.getOutputName(), "");
|
map.put(dim.getOutputName(), NullHandling.defaultStringValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,9 +25,11 @@ import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.io.Files;
|
import com.google.common.io.Files;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.druid.collections.CloseableStupidPool;
|
import org.apache.druid.collections.CloseableStupidPool;
|
||||||
|
import org.apache.druid.common.config.NullHandling;
|
||||||
import org.apache.druid.data.input.Row;
|
import org.apache.druid.data.input.Row;
|
||||||
import org.apache.druid.data.input.impl.CSVParseSpec;
|
import org.apache.druid.data.input.impl.CSVParseSpec;
|
||||||
import org.apache.druid.data.input.impl.DimensionsSpec;
|
import org.apache.druid.data.input.impl.DimensionsSpec;
|
||||||
|
import org.apache.druid.data.input.impl.JSONParseSpec;
|
||||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||||
import org.apache.druid.data.input.impl.TimestampSpec;
|
import org.apache.druid.data.input.impl.TimestampSpec;
|
||||||
import org.apache.druid.java.util.common.DateTimes;
|
import org.apache.druid.java.util.common.DateTimes;
|
||||||
|
@ -38,11 +40,13 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
|
||||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||||
import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
|
import org.apache.druid.query.dimension.ListFilteredDimensionSpec;
|
||||||
import org.apache.druid.query.dimension.RegexFilteredDimensionSpec;
|
import org.apache.druid.query.dimension.RegexFilteredDimensionSpec;
|
||||||
|
import org.apache.druid.query.filter.InDimFilter;
|
||||||
import org.apache.druid.query.filter.SelectorDimFilter;
|
import org.apache.druid.query.filter.SelectorDimFilter;
|
||||||
import org.apache.druid.query.groupby.GroupByQuery;
|
import org.apache.druid.query.groupby.GroupByQuery;
|
||||||
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
import org.apache.druid.query.groupby.GroupByQueryConfig;
|
||||||
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
|
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
|
||||||
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
|
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
|
||||||
|
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
|
||||||
import org.apache.druid.query.spec.LegacySegmentSpec;
|
import org.apache.druid.query.spec.LegacySegmentSpec;
|
||||||
import org.apache.druid.query.topn.TopNQuery;
|
import org.apache.druid.query.topn.TopNQuery;
|
||||||
import org.apache.druid.query.topn.TopNQueryBuilder;
|
import org.apache.druid.query.topn.TopNQueryBuilder;
|
||||||
|
@ -82,13 +86,15 @@ import java.util.Map;
|
||||||
@RunWith(Parameterized.class)
|
@RunWith(Parameterized.class)
|
||||||
public class MultiValuedDimensionTest
|
public class MultiValuedDimensionTest
|
||||||
{
|
{
|
||||||
@Parameterized.Parameters(name = "{0}")
|
@Parameterized.Parameters(name = "groupby: {0} forceHashAggregation: {2} ({1})")
|
||||||
public static Collection<?> constructorFeeder()
|
public static Collection<?> constructorFeeder()
|
||||||
{
|
{
|
||||||
final List<Object[]> constructors = new ArrayList<>();
|
final List<Object[]> constructors = new ArrayList<>();
|
||||||
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
|
for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
|
||||||
constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance()});
|
constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance(), false});
|
||||||
constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance()});
|
constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance(), false});
|
||||||
|
constructors.add(new Object[]{config, TmpFileSegmentWriteOutMediumFactory.instance(), true});
|
||||||
|
constructors.add(new Object[]{config, OffHeapMemorySegmentWriteOutMediumFactory.instance(), true});
|
||||||
}
|
}
|
||||||
return constructors;
|
return constructors;
|
||||||
}
|
}
|
||||||
|
@ -101,7 +107,13 @@ public class MultiValuedDimensionTest
|
||||||
|
|
||||||
private File persistedSegmentDir;
|
private File persistedSegmentDir;
|
||||||
|
|
||||||
public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
|
private IncrementalIndex incrementalIndexNullSampler;
|
||||||
|
private QueryableIndex queryableIndexNullSampler;
|
||||||
|
private File persistedSegmentDirNullSampler;
|
||||||
|
|
||||||
|
private final ImmutableMap<String, Object> context;
|
||||||
|
|
||||||
|
public MultiValuedDimensionTest(final GroupByQueryConfig config, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, boolean forceHashAggregation)
|
||||||
{
|
{
|
||||||
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||||
ImmutableList.of(),
|
ImmutableList.of(),
|
||||||
|
@ -109,6 +121,10 @@ public class MultiValuedDimensionTest
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
|
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
|
||||||
|
|
||||||
|
this.context = config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V1)
|
||||||
|
? ImmutableMap.of()
|
||||||
|
: ImmutableMap.of("forceHashAggregation", forceHashAggregation);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -147,6 +163,41 @@ public class MultiValuedDimensionTest
|
||||||
.persist(incrementalIndex, persistedSegmentDir, new IndexSpec(), null);
|
.persist(incrementalIndex, persistedSegmentDir, new IndexSpec(), null);
|
||||||
|
|
||||||
queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir);
|
queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
StringInputRowParser parserNullSampler = new StringInputRowParser(
|
||||||
|
new JSONParseSpec(
|
||||||
|
new TimestampSpec("time", "iso", null),
|
||||||
|
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("product", "tags", "othertags")), null, null)
|
||||||
|
),
|
||||||
|
"UTF-8"
|
||||||
|
);
|
||||||
|
|
||||||
|
incrementalIndexNullSampler = new IncrementalIndex.Builder()
|
||||||
|
.setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
|
||||||
|
.setMaxRowCount(5000)
|
||||||
|
.buildOnheap();
|
||||||
|
|
||||||
|
String[] rowsNullSampler = new String[]{
|
||||||
|
"{\"time\":\"2011-01-13T00:00:00.000Z\",\"product\":\"product_1\",\"tags\":[],\"othertags\":[\"u1\", \"u2\"]}",
|
||||||
|
"{\"time\":\"2011-01-12T00:00:00.000Z\",\"product\":\"product_2\",\"othertags\":[\"u3\", \"u4\"]}",
|
||||||
|
"{\"time\":\"2011-01-14T00:00:00.000Z\",\"product\":\"product_3\",\"tags\":[\"\"],\"othertags\":[\"u1\", \"u5\"]}",
|
||||||
|
"{\"time\":\"2011-01-15T00:00:00.000Z\",\"product\":\"product_4\",\"tags\":[\"t1\", \"t2\", \"\"],\"othertags\":[\"u6\", \"u7\"]}",
|
||||||
|
"{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_5\",\"tags\":[],\"othertags\":[]}",
|
||||||
|
"{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_6\"}",
|
||||||
|
"{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_7\",\"othertags\":[]}",
|
||||||
|
"{\"time\":\"2011-01-16T00:00:00.000Z\",\"product\":\"product_8\",\"tags\":[\"\"],\"othertags\":[]}"
|
||||||
|
};
|
||||||
|
|
||||||
|
for (String row : rowsNullSampler) {
|
||||||
|
incrementalIndexNullSampler.add(parserNullSampler.parse(row));
|
||||||
|
}
|
||||||
|
persistedSegmentDirNullSampler = Files.createTempDir();
|
||||||
|
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory)
|
||||||
|
.persist(incrementalIndexNullSampler, persistedSegmentDirNullSampler, new IndexSpec(), null);
|
||||||
|
|
||||||
|
queryableIndexNullSampler = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDirNullSampler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -165,6 +216,7 @@ public class MultiValuedDimensionTest
|
||||||
.setGranularity(Granularities.ALL)
|
.setGranularity(Granularities.ALL)
|
||||||
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
|
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
|
||||||
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
||||||
|
.setContext(context)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||||
|
@ -200,6 +252,7 @@ public class MultiValuedDimensionTest
|
||||||
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
|
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
|
||||||
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
||||||
.setDimFilter(new SelectorDimFilter("tags", "t3", null))
|
.setDimFilter(new SelectorDimFilter("tags", "t3", null))
|
||||||
|
.setContext(context)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||||
|
@ -221,6 +274,79 @@ public class MultiValuedDimensionTest
|
||||||
TestHelper.assertExpectedObjects(expectedResults, result.toList(), "dimFilter");
|
TestHelper.assertExpectedObjects(expectedResults, result.toList(), "dimFilter");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupByWithDimFilterEmptyResults()
|
||||||
|
{
|
||||||
|
GroupByQuery query = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource("xx")
|
||||||
|
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
|
||||||
|
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
||||||
|
.setDimFilter(new InDimFilter("product", ImmutableList.of("product_5"), null))
|
||||||
|
.setContext(context)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||||
|
ImmutableList.of(
|
||||||
|
new QueryableIndexSegment(queryableIndexNullSampler, SegmentId.dummy("sid1")),
|
||||||
|
new IncrementalIndexSegment(incrementalIndexNullSampler, SegmentId.dummy("sid2"))
|
||||||
|
),
|
||||||
|
query
|
||||||
|
);
|
||||||
|
|
||||||
|
List<Row> expectedResults = Collections.singletonList(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 2L)
|
||||||
|
);
|
||||||
|
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, result.toList(), "filter-empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupByWithDimFilterNullishResults()
|
||||||
|
{
|
||||||
|
GroupByQuery query = GroupByQuery
|
||||||
|
.builder()
|
||||||
|
.setDataSource("xx")
|
||||||
|
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
|
||||||
|
.setGranularity(Granularities.ALL)
|
||||||
|
.setDimensions(new DefaultDimensionSpec("tags", "tags"))
|
||||||
|
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
||||||
|
.setDimFilter(
|
||||||
|
new InDimFilter("product", ImmutableList.of("product_5", "product_6", "product_8"), null)
|
||||||
|
)
|
||||||
|
.setContext(context)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||||
|
ImmutableList.of(
|
||||||
|
new QueryableIndexSegment(queryableIndexNullSampler, SegmentId.dummy("sid1")),
|
||||||
|
new IncrementalIndexSegment(incrementalIndexNullSampler, SegmentId.dummy("sid2"))
|
||||||
|
),
|
||||||
|
query
|
||||||
|
);
|
||||||
|
|
||||||
|
List<Row> expectedResults;
|
||||||
|
// an empty row e.g. [], or group by 'missing' value, is grouped with the default string value, "" or null
|
||||||
|
// grouping input is filtered to [], null, [""]
|
||||||
|
if (NullHandling.replaceWithDefault()) {
|
||||||
|
// when sql compatible null handling is disabled, the inputs are effectively [], null, [null] and
|
||||||
|
// are all grouped as null
|
||||||
|
expectedResults = Collections.singletonList(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 6L)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// with sql compatible null handling, null and [] = null, but [""] = ""
|
||||||
|
expectedResults = ImmutableList.of(
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 4L),
|
||||||
|
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "", "count", 2L)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
TestHelper.assertExpectedObjects(expectedResults, result.toList(), "filter-nullish");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGroupByWithDimFilterAndWithFilteredDimSpec()
|
public void testGroupByWithDimFilterAndWithFilteredDimSpec()
|
||||||
{
|
{
|
||||||
|
@ -232,6 +358,7 @@ public class MultiValuedDimensionTest
|
||||||
.setDimensions(new RegexFilteredDimensionSpec(new DefaultDimensionSpec("tags", "tags"), "t3"))
|
.setDimensions(new RegexFilteredDimensionSpec(new DefaultDimensionSpec("tags", "tags"), "t3"))
|
||||||
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
.setAggregatorSpecs(new CountAggregatorFactory("count"))
|
||||||
.setDimFilter(new SelectorDimFilter("tags", "t3", null))
|
.setDimFilter(new SelectorDimFilter("tags", "t3", null))
|
||||||
|
.setContext(context)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||||
|
@ -264,7 +391,8 @@ public class MultiValuedDimensionTest
|
||||||
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
|
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
|
||||||
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
|
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
|
||||||
.threshold(5)
|
.threshold(5)
|
||||||
.filters(new SelectorDimFilter("tags", "t3", null)).build();
|
.filters(new SelectorDimFilter("tags", "t3", null))
|
||||||
|
.build();
|
||||||
|
|
||||||
try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
|
try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
|
||||||
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
|
QueryRunnerFactory factory = new TopNQueryRunnerFactory(
|
||||||
|
|
Loading…
Reference in New Issue