diff --git a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java index acd984b6442..72859f02c9c 100644 --- a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java +++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java @@ -61,9 +61,10 @@ public class UnnestDataSource implements DataSource DimFilter unnestFilter ) { - this.base = dataSource; - this.virtualColumn = virtualColumn; - this.unnestFilter = unnestFilter; + // select * from UNNEST(ARRAY[1,2,3]) as somu(d3) where somu.d3 IN ('a','b') + this.base = dataSource; // table + this.virtualColumn = virtualColumn; // MV_TO_ARRAY + this.unnestFilter = unnestFilter; // d3 in (a,b) } @JsonCreator diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/MultiStringFirstDimensionVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/MultiStringFirstDimensionVectorAggregator.java new file mode 100644 index 00000000000..c13165a3944 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/MultiStringFirstDimensionVectorAggregator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class MultiStringFirstDimensionVectorAggregator implements VectorAggregator +{ + private final BaseLongVectorValueSelector timeSelector; + private final MultiValueDimensionVectorSelector valueDimensionVectorSelector; + private long firstTime; + private final int maxStringBytes; + private final boolean useDefault = NullHandling.replaceWithDefault(); + + public MultiStringFirstDimensionVectorAggregator( + BaseLongVectorValueSelector timeSelector, + MultiValueDimensionVectorSelector valueDimensionVectorSelector, + int maxStringBytes + ) + { + this.timeSelector = timeSelector; + this.valueDimensionVectorSelector = valueDimensionVectorSelector; + this.maxStringBytes = maxStringBytes; + firstTime = Long.MAX_VALUE; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.put( + position + NumericFirstVectorAggregator.NULL_OFFSET, + useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE + ); + buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final long[] timeVector = timeSelector.getLongVector(); + final IndexedInts[] valueVector = valueDimensionVectorSelector.getRowVector(); + firstTime = buf.getLong(position); + int index = startRow; + for (int i = startRow; i < endRow; i++) { + if (valueVector[i].get(0) != 0) { + index = i; + break; + } + } + + final long earliestTime = timeVector[index]; + if (earliestTime < firstTime) { + firstTime = earliestTime; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index].get(0)); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + long[] timeVector = timeSelector.getLongVector(); + IndexedInts[] values = valueDimensionVectorSelector.getRowVector(); + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] < firstTime) { + firstTime = timeVector[row]; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt( + position + NumericFirstVectorAggregator.VALUE_OFFSET, + values[row].size() > 0 ? values[row].get(0) : 0 + ); + } + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + int index = buf.getInt(position + NumericFirstVectorAggregator.VALUE_OFFSET); + long earliest = buf.getLong(position); + String strValue = valueDimensionVectorSelector.lookupName(index); + return new SerializablePairLongString(earliest, StringUtils.chop(strValue, maxStringBytes)); + + } + + @Override + public void close() + { + + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java new file mode 100644 index 00000000000..2e6184273fd --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/SingleStringFirstDimensionVectorAggregator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleStringFirstDimensionVectorAggregator implements VectorAggregator +{ + private final BaseLongVectorValueSelector timeSelector; + private final SingleValueDimensionVectorSelector valueDimensionVectorSelector; + private long firstTime; + private final int maxStringBytes; + private final boolean useDefault = NullHandling.replaceWithDefault(); + + public SingleStringFirstDimensionVectorAggregator( + BaseLongVectorValueSelector timeSelector, + SingleValueDimensionVectorSelector valueDimensionVectorSelector, + int maxStringBytes + ) + { + this.timeSelector = timeSelector; + this.valueDimensionVectorSelector = valueDimensionVectorSelector; + this.maxStringBytes = maxStringBytes; + this.firstTime = Long.MAX_VALUE; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, Long.MAX_VALUE); + buf.put( + position + NumericFirstVectorAggregator.NULL_OFFSET, + useDefault ? NullHandling.IS_NOT_NULL_BYTE : NullHandling.IS_NULL_BYTE + ); + buf.putLong(position + NumericFirstVectorAggregator.VALUE_OFFSET, 0); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + final long[] timeVector = timeSelector.getLongVector(); + final int[] valueVector = valueDimensionVectorSelector.getRowVector(); + firstTime = buf.getLong(position); + int index = startRow; + for (int i = startRow; i < endRow; i++) { + index = i; + break; + } + + final long earliestTime = timeVector[index]; + if (earliestTime < firstTime) { + firstTime = earliestTime; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, valueVector[index]); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + long[] timeVector = timeSelector.getLongVector(); + int[] values = valueDimensionVectorSelector.getRowVector(); + for (int i = 0; i < numRows; i++) { + int position = positions[i] + positionOffset; + int row = rows == null ? i : rows[i]; + long firstTime = buf.getLong(position); + if (timeVector[row] < firstTime) { + firstTime = timeVector[row]; + buf.putLong(position, firstTime); + buf.put(position + NumericFirstVectorAggregator.NULL_OFFSET, NullHandling.IS_NOT_NULL_BYTE); + buf.putInt(position + NumericFirstVectorAggregator.VALUE_OFFSET, values[row]); + } + } + + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + int index = buf.getInt(position + NumericFirstVectorAggregator.VALUE_OFFSET); + long earliest = buf.getLong(position); + String strValue = valueDimensionVectorSelector.lookupName(index); + return new SerializablePairLongString(earliest, StringUtils.chop(strValue, maxStringBytes)); + } + + @Override + public void close() + { + // nothing to close + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index 3cc31b2737a..d4463a07ee5 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -34,6 +34,7 @@ import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnInspector; import org.apache.druid.segment.ColumnSelectorFactory; @@ -41,7 +42,10 @@ import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import org.apache.druid.segment.vector.VectorObjectSelector; @@ -105,6 +109,8 @@ public class StringFirstAggregatorFactory extends AggregatorFactory private final String timeColumn; protected final int maxStringBytes; + private boolean getFirstElementFromMvd; + @JsonCreator public StringFirstAggregatorFactory( @JsonProperty("name") String name, @@ -126,8 +132,10 @@ public class StringFirstAggregatorFactory extends AggregatorFactory this.maxStringBytes = maxStringBytes == null ? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE : maxStringBytes; + this.getFirstElementFromMvd = false; } + @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { @@ -163,10 +171,30 @@ public class StringFirstAggregatorFactory extends AggregatorFactory @Override public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) { - ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); - VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName); BaseLongVectorValueSelector timeSelector = (BaseLongVectorValueSelector) selectorFactory.makeValueSelector( timeColumn); + ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName); + if (capabilities != null) { + if (capabilities.is(ValueType.STRING) && capabilities.isDictionaryEncoded().isTrue()) { + // Case 1: Multivalue string with dimension selector + if (capabilities.hasMultipleValues().isTrue()) { + if (isGetFirstElementFromMvd()) { + MultiValueDimensionVectorSelector mSelector = selectorFactory.makeMultiValueDimensionSelector( + DefaultDimensionSpec.of( + fieldName)); + return new MultiStringFirstDimensionVectorAggregator(timeSelector, mSelector, maxStringBytes); + } + } else { + // Case 2: Single string with dimension selector + SingleValueDimensionVectorSelector sSelector = selectorFactory.makeSingleValueDimensionSelector( + DefaultDimensionSpec.of( + fieldName)); + return new SingleStringFirstDimensionVectorAggregator(timeSelector, sSelector, maxStringBytes); + } + } + } + // Case 3: return vector object selector + VectorObjectSelector vSelector = selectorFactory.makeObjectSelector(fieldName); if (capabilities != null) { return new StringFirstVectorAggregator(timeSelector, vSelector, maxStringBytes); } else { @@ -255,6 +283,16 @@ public class StringFirstAggregatorFactory extends AggregatorFactory return Arrays.asList(timeColumn, fieldName); } + public boolean isGetFirstElementFromMvd() + { + return getFirstElementFromMvd; + } + + public void setGetFirstElementFromMvd(boolean getFirstElementFromMvd) + { + this.getFirstElementFromMvd = getFirstElementFromMvd; + } + @Override public byte[] getCacheKey() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java index 7c94c202887..ea77084518a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstVectorAggregator.java @@ -38,7 +38,7 @@ public class StringFirstVectorAggregator implements VectorAggregator private final BaseLongVectorValueSelector timeSelector; private final VectorObjectSelector valueSelector; private final int maxStringBytes; - //protected long firstTime; + public StringFirstVectorAggregator( BaseLongVectorValueSelector timeSelector, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 8491fdab637..12a5443df35 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -1161,7 +1161,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest public void testStringEarliestInSubquery() { testQuery( - "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1, 10) AS val FROM foo GROUP BY dim2)", + "SELECT SUM(val) FROM (SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2)", ImmutableList.of( GroupByQuery.builder() .setDataSource( @@ -1267,6 +1267,74 @@ public class CalciteQueryTest extends BaseCalciteQueryTest ); } + @Test + public void testStringEarliestSingleStringDim() + { + testQuery( + "SELECT dim2, EARLIEST(dim1,10) AS val FROM foo GROUP BY dim2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory( + "a0", + "dim1", + null, + 10 + ))) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? + ImmutableList.of( + new Object[]{null, "10.1"}, + new Object[]{"", "2"}, + new Object[]{"a", ""}, + new Object[]{"abc", "def"} + ) : ImmutableList.of( + new Object[]{"", "10.1"}, + new Object[]{"a", ""}, + new Object[]{"abc", "def"} + ) + ); + } + + @Test + public void testStringEarliestMultiStringDim() + { + testQuery( + "SELECT dim2, EARLIEST(dim3,10) AS val FROM foo GROUP BY dim2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE1) + .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0"))) + .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory( + "a0", + "dim3", + null, + 10 + ))) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() ? + ImmutableList.of( + new Object[]{null, "[b, c]"}, + new Object[]{"", "d"}, + new Object[]{"a", "[a, b]"}, + new Object[]{"abc", null} + ) : ImmutableList.of( + new Object[]{"", "[b, c]"}, + new Object[]{"a", "[a, b]"}, + new Object[]{"abc", ""} + ) + ); + } + // This test the off-heap (buffer) version of the AnyAggregator (String) @Test public void testStringAnyInSubquery()