mirror of https://github.com/apache/druid.git
Addressing issues for dictionary encoded single string columns where we can use the dictionary ids instead of the entire string
This commit is contained in:
parent
f78ca05fd6
commit
6cae4901e3
|
@ -61,9 +61,10 @@ public class UnnestDataSource implements DataSource
|
|||
DimFilter unnestFilter
|
||||
)
|
||||
{
|
||||
this.base = dataSource;
|
||||
this.virtualColumn = virtualColumn;
|
||||
this.unnestFilter = unnestFilter;
|
||||
// select * from UNNEST(ARRAY[1,2,3]) as somu(d3) where somu.d3 IN ('a','b')
|
||||
this.base = dataSource; // table
|
||||
this.virtualColumn = virtualColumn; // MV_TO_ARRAY
|
||||
this.unnestFilter = unnestFilter; // d3 in (a,b)
|
||||
}
|
||||
|
||||
@JsonCreator
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.first;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.query.aggregation.SerializablePairLongString;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.segment.data.IndexedInts;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class MultiStringFirstDimensionVectorAggregator implements VectorAggregator
|
||||
{
|
||||
private final BaseLongVectorValueSelector timeSelector;
|
||||
private final MultiValueDimensionVectorSelector valueDimensionVectorSelector;
|
||||
private long firstTime;
|
||||
private final int maxStringBytes;
|
||||
private final boolean useDefault = NullHandling.replaceWithDefault();
|
||||
|
||||
public MultiStringFirstDimensionVectorAggregator(
|
||||
BaseLongVectorValueSelector timeSelector,
|
||||
MultiValueDimensionVectorSelector valueDimensionVectorSelector,
|
||||
int maxStringBytes
|
||||
)
|
||||
{
|
||||
this.timeSelector = timeSelector;
|
||||
this.valueDimensionVectorSelector = valueDimensionVectorSelector;
|
||||
this.maxStringBytes = maxStringBytes;
|
||||
firstTime = Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putLong(position, Long.MAX_VALUE);
|
||||
buf.put(
|
||||
position + NumericFirstVectorAggregator.NULL_OFFSET,
|
||||
useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
|
||||
);
|
||||
buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
final long[] timeVector = timeSelector.getLongVector();
|
||||
final IndexedInts[] valueVector = valueDimensionVectorSelector.getRowVector();
|
||||
firstTime = buf.getLong(position);
|
||||
int index = startRow;
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
if (valueVector[i].get(0) != 0) {
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
final long earliestTime = timeVector[index];
|
||||
if (earliestTime < firstTime) {
|
||||
firstTime = earliestTime;
|
||||
buf.putLong(position, firstTime);
|
||||
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
|
||||
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index].get(0));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
long[] timeVector = timeSelector.getLongVector();
|
||||
IndexedInts[] values = valueDimensionVectorSelector.getRowVector();
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
int position = positions[i] + positionOffset;
|
||||
int row = rows == null ? i : rows[i];
|
||||
long firstTime = buf.getLong(position);
|
||||
if (timeVector[row] < firstTime) {
|
||||
firstTime = timeVector[row];
|
||||
buf.putLong(position, firstTime);
|
||||
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
|
||||
buf.putInt(
|
||||
position + NumericFirstVectorAggregator.VALUE_OFFSET,
|
||||
values[row].size() > 0 ? values[row].get(0) : 0
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
int index = buf.getInt(position + NumericFirstVectorAggregator.VALUE_OFFSET);
|
||||
long earliest = buf.getLong(position);
|
||||
String strValue = valueDimensionVectorSelector.lookupName(index);
|
||||
return new SerializablePairLongString(earliest, StringUtils.chop(strValue, maxStringBytes));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query.aggregation.first;
|
||||
|
||||
import org.apache.druid.common.config.NullHandling;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.query.aggregation.SerializablePairLongString;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class SingleStringFirstDimensionVectorAggregator implements VectorAggregator
|
||||
{
|
||||
private final BaseLongVectorValueSelector timeSelector;
|
||||
private final SingleValueDimensionVectorSelector valueDimensionVectorSelector;
|
||||
private long firstTime;
|
||||
private final int maxStringBytes;
|
||||
private final boolean useDefault = NullHandling.replaceWithDefault();
|
||||
|
||||
public SingleStringFirstDimensionVectorAggregator(
|
||||
BaseLongVectorValueSelector timeSelector,
|
||||
SingleValueDimensionVectorSelector valueDimensionVectorSelector,
|
||||
int maxStringBytes
|
||||
)
|
||||
{
|
||||
this.timeSelector = timeSelector;
|
||||
this.valueDimensionVectorSelector = valueDimensionVectorSelector;
|
||||
this.maxStringBytes = maxStringBytes;
|
||||
this.firstTime = Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putLong(position, Long.MAX_VALUE);
|
||||
buf.put(
|
||||
position + NumericFirstVectorAggregator.NULL_OFFSET,
|
||||
useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE
|
||||
);
|
||||
buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
|
||||
{
|
||||
final long[] timeVector = timeSelector.getLongVector();
|
||||
final int[] valueVector = valueDimensionVectorSelector.getRowVector();
|
||||
firstTime = buf.getLong(position);
|
||||
int index = startRow;
|
||||
for (int i = startRow; i < endRow; i++) {
|
||||
index = i;
|
||||
break;
|
||||
}
|
||||
|
||||
final long earliestTime = timeVector[index];
|
||||
if (earliestTime < firstTime) {
|
||||
firstTime = earliestTime;
|
||||
buf.putLong(position, firstTime);
|
||||
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
|
||||
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
|
||||
{
|
||||
long[] timeVector = timeSelector.getLongVector();
|
||||
int[] values = valueDimensionVectorSelector.getRowVector();
|
||||
for (int i = 0; i < numRows; i++) {
|
||||
int position = positions[i] + positionOffset;
|
||||
int row = rows == null ? i : rows[i];
|
||||
long firstTime = buf.getLong(position);
|
||||
if (timeVector[row] < firstTime) {
|
||||
firstTime = timeVector[row];
|
||||
buf.putLong(position, firstTime);
|
||||
buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE);
|
||||
buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
int index = buf.getInt(position + NumericFirstVectorAggregator.VALUE_OFFSET);
|
||||
long earliest = buf.getLong(position);
|
||||
String strValue = valueDimensionVectorSelector.lookupName(index);
|
||||
return new SerializablePairLongString(earliest, StringUtils.chop(strValue, maxStringBytes));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
// nothing to close
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.apache.druid.query.aggregation.BufferAggregator;
|
|||
import org.apache.druid.query.aggregation.SerializablePairLongString;
|
||||
import org.apache.druid.query.aggregation.VectorAggregator;
|
||||
import org.apache.druid.query.cache.CacheKeyBuilder;
|
||||
import org.apache.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.apache.druid.segment.BaseObjectColumnValueSelector;
|
||||
import org.apache.druid.segment.ColumnInspector;
|
||||
import org.apache.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -41,7 +42,10 @@ import org.apache.druid.segment.NilColumnValueSelector;
|
|||
import org.apache.druid.segment.column.ColumnCapabilities;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
import org.apache.druid.segment.column.ColumnType;
|
||||
import org.apache.druid.segment.column.ValueType;
|
||||
import org.apache.druid.segment.vector.BaseLongVectorValueSelector;
|
||||
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
|
||||
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
|
||||
import org.apache.druid.segment.vector.VectorObjectSelector;
|
||||
|
||||
|
@ -105,6 +109,8 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
|
|||
private final String timeColumn;
|
||||
protected final int maxStringBytes;
|
||||
|
||||
private boolean getFirstElementFromMvd;
|
||||
|
||||
@JsonCreator
|
||||
public StringFirstAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
|
@ -126,8 +132,10 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
|
|||
this.maxStringBytes = maxStringBytes == null
|
||||
? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
|
||||
: maxStringBytes;
|
||||
this.getFirstElementFromMvd = false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
|
@ -163,10 +171,30 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
|
||||
{
|
||||
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
|
||||
VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName);
|
||||
BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) selectorFactory.makeValueSelector(
|
||||
timeColumn);
|
||||
ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
|
||||
if (capabilities != null) {
|
||||
if (capabilities.is(ValueType.STRING) && capabilities.isDictionaryEncoded().isTrue()) {
|
||||
// Case 1: Multivalue string with dimension selector
|
||||
if (capabilities.hasMultipleValues().isTrue()) {
|
||||
if (isGetFirstElementFromMvd()) {
|
||||
MultiValueDimensionVectorSelector mSelector = selectorFactory.makeMultiValueDimensionSelector(
|
||||
DefaultDimensionSpec.of(
|
||||
fieldName));
|
||||
return new MultiStringFirstDimensionVectorAggregator(timeSelector, mSelector, maxStringBytes);
|
||||
}
|
||||
} else {
|
||||
// Case 2: Single string with dimension selector
|
||||
SingleValueDimensionVectorSelector sSelector = selectorFactory.makeSingleValueDimensionSelector(
|
||||
DefaultDimensionSpec.of(
|
||||
fieldName));
|
||||
return new SingleStringFirstDimensionVectorAggregator(timeSelector, sSelector, maxStringBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Case 3: return vector object selector
|
||||
VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName);
|
||||
if (capabilities != null) {
|
||||
return new StringFirstVectorAggregator(timeSelector, vSelector, maxStringBytes);
|
||||
} else {
|
||||
|
@ -255,6 +283,16 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
|
|||
return Arrays.asList(timeColumn, fieldName);
|
||||
}
|
||||
|
||||
public boolean isGetFirstElementFromMvd()
|
||||
{
|
||||
return getFirstElementFromMvd;
|
||||
}
|
||||
|
||||
public void setGetFirstElementFromMvd(boolean getFirstElementFromMvd)
|
||||
{
|
||||
this.getFirstElementFromMvd = getFirstElementFromMvd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
|
|
|
@ -38,7 +38,7 @@ public class StringFirstVectorAggregator implements VectorAggregator
|
|||
private final BaseLongVectorValueSelector timeSelector;
|
||||
private final VectorObjectSelector valueSelector;
|
||||
private final int maxStringBytes;
|
||||
//protected long firstTime;
|
||||
|
||||
|
||||
public StringFirstVectorAggregator(
|
||||
BaseLongVectorValueSelector timeSelector,
|
||||
|
|
|
@ -1161,7 +1161,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
public void testStringEarliestInSubquery()
|
||||
{
|
||||
testQuery(
|
||||
"SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)",
|
||||
"SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2)",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(
|
||||
|
@ -1267,6 +1267,74 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
|
|||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringEarliestSingleStringDim()
|
||||
{
|
||||
testQuery(
|
||||
"SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(CalciteTests.DATASOURCE1)
|
||||
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
|
||||
.setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory(
|
||||
"a0",
|
||||
"dim1",
|
||||
null,
|
||||
10
|
||||
)))
|
||||
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
NullHandling.sqlCompatible() ?
|
||||
ImmutableList.of(
|
||||
new Object[]{null, "10.1"},
|
||||
new Object[]{"", "2"},
|
||||
new Object[]{"a", ""},
|
||||
new Object[]{"abc", "def"}
|
||||
) : ImmutableList.of(
|
||||
new Object[]{"", "10.1"},
|
||||
new Object[]{"a", ""},
|
||||
new Object[]{"abc", "def"}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringEarliestMultiStringDim()
|
||||
{
|
||||
testQuery(
|
||||
"SELECT dim2, EARLIEST(dim3,10) AS val FROM foo GROUP BY dim2",
|
||||
ImmutableList.of(
|
||||
GroupByQuery.builder()
|
||||
.setDataSource(CalciteTests.DATASOURCE1)
|
||||
.setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
|
||||
.setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory(
|
||||
"a0",
|
||||
"dim3",
|
||||
null,
|
||||
10
|
||||
)))
|
||||
.setInterval(querySegmentSpec(Filtration.eternity()))
|
||||
.setGranularity(Granularities.ALL)
|
||||
.setContext(QUERY_CONTEXT_DEFAULT)
|
||||
.build()
|
||||
),
|
||||
NullHandling.sqlCompatible() ?
|
||||
ImmutableList.of(
|
||||
new Object[]{null, "[b, c]"},
|
||||
new Object[]{"", "d"},
|
||||
new Object[]{"a", "[a, b]"},
|
||||
new Object[]{"abc", null}
|
||||
) : ImmutableList.of(
|
||||
new Object[]{"", "[b, c]"},
|
||||
new Object[]{"a", "[a, b]"},
|
||||
new Object[]{"abc", ""}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// This test the off-heap (buffer) version of the AnyAggregator (String)
|
||||
@Test
|
||||
public void testStringAnyInSubquery()
|
||||
|
|
Loading…
Reference in New Issue