Unnest functionality for Druid (#13268)

* Moving all unnest cursor code atop refactored code for unnest

* Updating unnest cursor

* Removing dedup and fixing up some null checks

* AllowList changes

* Fixing some NPEs

* Using bitset for allowlist

* Updating the initialization only when cursor is in non-done state

* Updating code to skip rows not in allow list

* Adding a flag for cases when first element is not in allowed list

* Updating for a null in allowList

* Splitting unnest cursor into 2 subclasses

* Intercepting some apis with columnName for new unnested column

* Adding test cases and renaming some stuff

* checkstyle fixes

* Moving to an interface for Unnest

* handling null rows in a dimension

* Updating cursors after comments part-1

* Addressing comments and adding some more tests

* Reverting a change to ScanQueryRunner and improving a comment

* removing an unused function

* Updating cursors after comments part 2

* One last fix for review comments

* Making some functions private, deleting some comments, adding a test for unnest of unnest with allowList

* Adding an exception for a case

* Closure for unnest data source

* Adding some javadocs

* One minor change in makeDimSelector of columnarCursor

* Updating an error message

* Update processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>

* Unnesting on virtual columns was missing an object array, adding that to support virtual columns unnesting

* Updating exceptions to use UOE

* Renamed files, added column capability test on adapter, return statement and made unnest datasource not cacheable for the time being

* Handling for null values in dim selector

* Fixing a NPE for null row

* Updating capabilities

* Updating capabilities

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
This commit is contained in:
somu-imply 2022-12-02 18:48:25 -08:00 committed by GitHub
parent 78c1a2bd66
commit 9177419628
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 2602 additions and 11 deletions

View File

@ -41,7 +41,8 @@ import java.util.function.Function;
@JsonSubTypes.Type(value = JoinDataSource.class, name = "join"),
@JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"),
@JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"),
@JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable")
@JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable"),
@JsonSubTypes.Type(value = UnnestDataSource.class, name = "unnest")
})
public interface DataSource
{

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.UnnestSegmentReference;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* The data source for representing an unnest operation.
*
* An unnest data source has the following:
* a base data source which is to be unnested
* the column name of the MVD which will be unnested
* the name of the column that will hold the unnested values
* and an allowlist serving as a filter of which values in the MVD will be unnested.
*/
public class UnnestDataSource implements DataSource
{
private final DataSource base;
private final String column;
private final String outputName;
private final LinkedHashSet<String> allowList;
private UnnestDataSource(
DataSource dataSource,
String columnName,
String outputName,
LinkedHashSet<String> allowList
)
{
this.base = dataSource;
this.column = columnName;
this.outputName = outputName;
this.allowList = allowList;
}
@JsonCreator
public static UnnestDataSource create(
@JsonProperty("base") DataSource base,
@JsonProperty("column") String columnName,
@JsonProperty("outputName") String outputName,
@Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList
)
{
return new UnnestDataSource(base, columnName, outputName, allowList);
}
@JsonProperty("base")
public DataSource getBase()
{
return base;
}
@JsonProperty("column")
public String getColumn()
{
return column;
}
@JsonProperty("outputName")
public String getOutputName()
{
return outputName;
}
@JsonProperty("allowList")
public LinkedHashSet<String> getAllowList()
{
return allowList;
}
@Override
public Set<String> getTableNames()
{
return base.getTableNames();
}
@Override
public List<DataSource> getChildren()
{
return ImmutableList.of(base);
}
@Override
public DataSource withChildren(List<DataSource> children)
{
if (children.size() != 1) {
throw new IAE("Expected [1] child, got [%d]", children.size());
}
return new UnnestDataSource(children.get(0), column, outputName, allowList);
}
@Override
public boolean isCacheable(boolean isBroker)
{
return false;
}
@Override
public boolean isGlobal()
{
return base.isGlobal();
}
@Override
public boolean isConcrete()
{
return base.isConcrete();
}
@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAccumulator
)
{
final Function<SegmentReference, SegmentReference> segmentMapFn = base.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> {
if (column == null) {
return segmentMapFn;
} else if (column.isEmpty()) {
return segmentMapFn;
} else {
return
baseSegment ->
new UnnestSegmentReference(
segmentMapFn.apply(baseSegment),
column,
outputName,
allowList
);
}
}
);
}
@Override
public DataSource withUpdatedDataSource(DataSource newSource)
{
return new UnnestDataSource(newSource, column, outputName, allowList);
}
@Override
public byte[] getCacheKey()
{
// The column being unnested would need to be part of the cache key
// as the results are dependent on what column is being unnested.
// Currently, it is not cacheable.
// Future development should use the table name and column came to
// create an appropriate cac
return null;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnnestDataSource that = (UnnestDataSource) o;
return column.equals(that.column)
&& outputName.equals(that.outputName)
&& base.equals(that.base);
}
@Override
public int hashCode()
{
return Objects.hash(base, column, outputName);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
@ -112,17 +113,29 @@ public class DataSourceAnalysis
Query<?> baseQuery = null;
DataSource current = dataSource;
while (current instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) current).getQuery();
// This needs to be an or condition between QueryDataSource and UnnestDataSource
// As queries can have interleaving query and unnest data sources.
// Ideally if each data source generate their own analysis object we can avoid the or here
// and have cleaner code. Especially as we increase the types of data sources in future
// these or checks will be tedious. Future development should move forDataSource method
// into each data source.
if (!(subQuery instanceof BaseQuery)) {
// We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec"
// work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
while (current instanceof QueryDataSource || current instanceof UnnestDataSource) {
if (current instanceof QueryDataSource) {
final Query<?> subQuery = ((QueryDataSource) current).getQuery();
if (!(subQuery instanceof BaseQuery)) {
// We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec"
// work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries.
throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName());
}
baseQuery = subQuery;
current = subQuery.getDataSource();
} else {
final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
current = unnestDataSource.getBase();
}
baseQuery = subQuery;
current = subQuery.getDataSource();
}
if (current instanceof JoinDataSource) {
@ -276,7 +289,8 @@ public class DataSourceAnalysis
/**
* Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a
* {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}. This is an
* {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}
* or an {@link UnnestDataSource} composed entirely of {@link TableDataSource} . This is an
* important property, because it corresponds to datasources that can be handled by Druid's distributed query stack.
*/
public boolean isConcreteTableBased()
@ -286,6 +300,10 @@ public class DataSourceAnalysis
// so check anyway for future-proofing.
return isConcreteBased() && (baseDataSource instanceof TableDataSource
|| (baseDataSource instanceof UnionDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource))
|| (baseDataSource instanceof UnnestDataSource &&
baseDataSource.getChildren()
.stream()
.allMatch(ds -> ds instanceof TableDataSource)));
@ -298,6 +316,7 @@ public class DataSourceAnalysis
{
return dataSource instanceof QueryDataSource;
}
/**
* Returns true if this datasource is made out of a join operation

View File

@ -0,0 +1,336 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
/**
* The cursor to help unnest MVDs without dictionary encoding and ARRAY type selectors.
* <p>
* Consider a segment has 2 rows
* ['a', 'b', 'c']
* ['d', 'e']
* <p>
* The baseCursor points to the row ['a', 'b', 'c']
* while the unnestCursor with each call of advance() moves over individual elements.
* <p>
* unnestCursor.advance() -> 'a'
* unnestCursor.advance() -> 'b'
* unnestCursor.advance() -> 'c'
* unnestCursor.advance() -> 'd' (advances base cursor first)
* unnestCursor.advance() -> 'e'
* <p>
* <p>
* The allowSet if available helps skip over elements which are not in the allowList by moving the cursor to
* the next available match.
* <p>
* The index reference points to the index of each row that the unnest cursor is accessing through currentVal
* The index ranges from 0 to the size of the list in each row which is held in the unnestListForCurrentRow
* <p>
* The needInitialization flag sets up the initial values of unnestListForCurrentRow at the beginning of the segment
*/
public class UnnestColumnValueSelectorCursor implements Cursor
{
private final Cursor baseCursor;
private final ColumnSelectorFactory baseColumnSelectorFactory;
private final ColumnValueSelector columnValueSelector;
private final String columnName;
private final String outputName;
private final LinkedHashSet<String> allowSet;
private int index;
private Object currentVal;
private List<Object> unnestListForCurrentRow;
private boolean needInitialization;
public UnnestColumnValueSelectorCursor(
Cursor cursor,
ColumnSelectorFactory baseColumSelectorFactory,
String columnName,
String outputColumnName,
LinkedHashSet<String> allowSet
)
{
this.baseCursor = cursor;
this.baseColumnSelectorFactory = baseColumSelectorFactory;
this.columnValueSelector = this.baseColumnSelectorFactory.makeColumnValueSelector(columnName);
this.columnName = columnName;
this.index = 0;
this.outputName = outputColumnName;
this.needInitialization = true;
this.allowSet = allowSet;
}
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
return new ColumnSelectorFactory()
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
if (!outputName.equals(dimensionSpec.getDimension())) {
return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
}
throw new UOE("Unsupported dimension selector while using column value selector for column [%s]", outputName);
}
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
{
if (!outputName.equals(columnName)) {
return baseColumnSelectorFactory.makeColumnValueSelector(columnName);
}
return new ColumnValueSelector()
{
@Override
public double getDouble()
{
Object value = getObject();
if (value == null) {
return 0;
}
if (value instanceof Number) {
return ((Number) value).doubleValue();
}
throw new UOE("Cannot convert object to double");
}
@Override
public float getFloat()
{
Object value = getObject();
if (value == null) {
return 0;
}
if (value instanceof Number) {
return ((Number) value).floatValue();
}
throw new UOE("Cannot convert object to float");
}
@Override
public long getLong()
{
Object value = getObject();
if (value == null) {
return 0;
}
if (value instanceof Number) {
return ((Number) value).longValue();
}
throw new UOE("Cannot convert object to long");
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
columnValueSelector.inspectRuntimeShape(inspector);
}
@Override
public boolean isNull()
{
return getObject() == null;
}
@Nullable
@Override
public Object getObject()
{
if (!unnestListForCurrentRow.isEmpty()) {
if (allowSet == null || allowSet.isEmpty()) {
return unnestListForCurrentRow.get(index);
} else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) {
return unnestListForCurrentRow.get(index);
}
}
return null;
}
@Override
public Class<?> classOfObject()
{
return Object.class;
}
};
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
if (!outputName.equals(column)) {
return baseColumnSelectorFactory.getColumnCapabilities(column);
}
final ColumnCapabilities capabilities = baseColumnSelectorFactory.getColumnCapabilities(columnName);
if (capabilities.isArray()) {
return ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType());
}
if (capabilities.hasMultipleValues().isTrue()) {
return ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false);
}
return baseColumnSelectorFactory.getColumnCapabilities(columnName);
}
};
}
@Override
public DateTime getTime()
{
return baseCursor.getTime();
}
@Override
public void advance()
{
advanceUninterruptibly();
BaseQuery.checkInterrupted();
}
@Override
public void advanceUninterruptibly()
{
do {
advanceAndUpdate();
} while (matchAndProceed());
}
@Override
public boolean isDone()
{
if (needInitialization && !baseCursor.isDone()) {
initialize();
}
return baseCursor.isDone();
}
@Override
public boolean isDoneOrInterrupted()
{
if (needInitialization && !baseCursor.isDoneOrInterrupted()) {
initialize();
}
return baseCursor.isDoneOrInterrupted();
}
@Override
public void reset()
{
index = 0;
needInitialization = true;
baseCursor.reset();
}
/**
* This method populates the objects when the base cursor moves to the next row
*
* @param firstRun flag to populate one time object references to hold values for unnest cursor
*/
private void getNextRow(boolean firstRun)
{
currentVal = this.columnValueSelector.getObject();
if (currentVal == null) {
if (!firstRun) {
unnestListForCurrentRow = new ArrayList<>();
}
unnestListForCurrentRow.add(null);
} else {
if (currentVal instanceof List) {
unnestListForCurrentRow = (List<Object>) currentVal;
} else if (currentVal instanceof Object[]) {
unnestListForCurrentRow = Arrays.asList((Object[]) currentVal);
} else if (currentVal.getClass().equals(String.class)) {
if (!firstRun) {
unnestListForCurrentRow = new ArrayList<>();
}
unnestListForCurrentRow.add(currentVal);
}
}
}
/**
* This initializes the unnest cursor and creates data structures
* to start iterating over the values to be unnested.
* This would also create a bitset for dictonary encoded columns to
* check for matching values specified in allowedList of UnnestDataSource.
*/
private void initialize()
{
this.unnestListForCurrentRow = new ArrayList<>();
getNextRow(needInitialization);
if (allowSet != null) {
if (!allowSet.isEmpty()) {
if (!allowSet.contains((String) unnestListForCurrentRow.get(index))) {
advance();
}
}
}
needInitialization = false;
}
/**
* This advances the cursor to move to the next element to be unnested.
* When the last element in a row is unnested, it is also responsible
* to move the base cursor to the next row for unnesting and repopulates
* the data structures, created during initialize(), to point to the new row
*/
private void advanceAndUpdate()
{
if (unnestListForCurrentRow.isEmpty() || index >= unnestListForCurrentRow.size() - 1) {
index = 0;
baseCursor.advance();
if (!baseCursor.isDone()) {
getNextRow(needInitialization);
}
} else {
index++;
}
}
/**
* This advances the unnest cursor in cases where an allowList is specified
* and the current value at the unnest cursor is not in the allowList.
* The cursor in such cases is moved till the next match is found.
*
* @return a boolean to indicate whether to stay or move cursor
*/
private boolean matchAndProceed()
{
boolean matchStatus;
if (allowSet == null || allowSet.isEmpty()) {
matchStatus = true;
} else {
matchStatus = allowSet.contains((String) unnestListForCurrentRow.get(index));
}
return !baseCursor.isDone() && !matchStatus;
}
}

View File

@ -0,0 +1,415 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.base.Predicate;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.BitSet;
import java.util.LinkedHashSet;
/**
* The cursor to help unnest MVDs with dictionary encoding.
* Consider a segment has 2 rows
* ['a', 'b', 'c']
* ['d', 'c']
* <p>
* Considering dictionary encoding, these are represented as
* <p>
* 'a' -> 0
* 'b' -> 1
* 'c' -> 2
* 'd' -> 3
* <p>
* The baseCursor points to the row of IndexedInts [0, 1, 2]
* while the unnestCursor with each call of advance() moves over individual elements.
* <p>
* advance() -> 0 -> 'a'
* advance() -> 1 -> 'b'
* advance() -> 2 -> 'c'
* advance() -> 3 -> 'd' (advances base cursor first)
* advance() -> 2 -> 'c'
* <p>
* Total 5 advance calls above
* <p>
* The allowSet, if available, helps skip over elements that are not in the allowList by moving the cursor to
* the next available match. The hashSet is converted into a bitset (during initialization) for efficiency.
* If allowSet is ['c', 'd'] then the advance moves over to the next available match
* <p>
* advance() -> 2 -> 'c'
* advance() -> 3 -> 'd' (advances base cursor first)
* advance() -> 2 -> 'c'
* <p>
* Total 3 advance calls in this case
* <p>
* The index reference points to the index of each row that the unnest cursor is accessing
* The indexedInts for each row are held in the indexedIntsForCurrentRow object
* <p>
* The needInitialization flag sets up the initial values of indexedIntsForCurrentRow at the beginning of the segment
*/
public class UnnestDimensionCursor implements Cursor
{
private final Cursor baseCursor;
private final DimensionSelector dimSelector;
private final String columnName;
private final String outputName;
private final LinkedHashSet<String> allowSet;
private final BitSet allowedBitSet;
private final ColumnSelectorFactory baseColumnSelectorFactory;
private int index;
private IndexedInts indexedIntsForCurrentRow;
private boolean needInitialization;
private SingleIndexInts indexIntsForRow;
public UnnestDimensionCursor(
Cursor cursor,
ColumnSelectorFactory baseColumnSelectorFactory,
String columnName,
String outputColumnName,
LinkedHashSet<String> allowSet
)
{
this.baseCursor = cursor;
this.baseColumnSelectorFactory = baseColumnSelectorFactory;
this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
this.columnName = columnName;
this.index = 0;
this.outputName = outputColumnName;
this.needInitialization = true;
this.allowSet = allowSet;
this.allowedBitSet = new BitSet();
}
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
return new ColumnSelectorFactory()
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
if (!outputName.equals(dimensionSpec.getDimension())) {
return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
}
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
// This object reference has been created
// during the call to initialize and referenced henceforth
return indexIntsForRow;
}
@Override
public ValueMatcher makeValueMatcher(@Nullable String value)
{
final int idForLookup = idLookup().lookupId(value);
if (idForLookup < 0) {
return new ValueMatcher()
{
@Override
public boolean matches()
{
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
};
}
return new ValueMatcher()
{
@Override
public boolean matches()
{
return idForLookup == indexedIntsForCurrentRow.get(index);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
dimSelector.inspectRuntimeShape(inspector);
}
};
}
@Override
public ValueMatcher makeValueMatcher(Predicate<String> predicate)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
dimSelector.inspectRuntimeShape(inspector);
}
@Nullable
@Override
public Object getObject()
{
if (indexedIntsForCurrentRow == null) {
return null;
}
if (allowedBitSet.isEmpty()) {
if (allowSet == null || allowSet.isEmpty()) {
return lookupName(indexedIntsForCurrentRow.get(index));
}
} else if (allowedBitSet.get(indexedIntsForCurrentRow.get(index))) {
return lookupName(indexedIntsForCurrentRow.get(index));
}
return null;
}
@Override
public Class<?> classOfObject()
{
return Object.class;
}
@Override
public int getValueCardinality()
{
if (!allowedBitSet.isEmpty()) {
return allowedBitSet.cardinality();
}
return dimSelector.getValueCardinality();
}
@Nullable
@Override
public String lookupName(int id)
{
return dimSelector.lookupName(id);
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return dimSelector.nameLookupPossibleInAdvance();
}
@Nullable
@Override
public IdLookup idLookup()
{
return dimSelector.idLookup();
}
};
}
/*
This ideally should not be called. If called delegate using the makeDimensionSelector
*/
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
{
if (!outputName.equals(columnName)) {
return baseColumnSelectorFactory.makeColumnValueSelector(columnName);
}
return makeDimensionSelector(DefaultDimensionSpec.of(columnName));
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
if (!outputName.equals(column)) {
return baseColumnSelectorFactory.getColumnCapabilities(column);
}
// This currently returns the same type as of the column to be unnested
// This is fine for STRING types
// But going forward if the dimension to be unnested is of type ARRAY,
// this should strip down to the base type of the array
final ColumnCapabilities capabilities = baseColumnSelectorFactory.getColumnCapabilities(columnName);
if (capabilities.isArray()) {
return ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType());
}
if (capabilities.hasMultipleValues().isTrue()) {
return ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false);
}
return baseColumnSelectorFactory.getColumnCapabilities(columnName);
}
};
}
@Override
public DateTime getTime()
{
return baseCursor.getTime();
}
@Override
public void advance()
{
advanceUninterruptibly();
BaseQuery.checkInterrupted();
}
@Override
public void advanceUninterruptibly()
{
do {
advanceAndUpdate();
} while (matchAndProceed());
}
@Override
public boolean isDone()
{
if (needInitialization && !baseCursor.isDone()) {
initialize();
}
return baseCursor.isDone();
}
@Override
public boolean isDoneOrInterrupted()
{
if (needInitialization && !baseCursor.isDoneOrInterrupted()) {
initialize();
}
return baseCursor.isDoneOrInterrupted();
}
@Override
public void reset()
{
index = 0;
needInitialization = true;
baseCursor.reset();
}
/**
* This initializes the unnest cursor and creates data structures
* to start iterating over the values to be unnested.
* This would also create a bitset for dictonary encoded columns to
* check for matching values specified in allowedList of UnnestDataSource.
*/
private void initialize()
{
IdLookup idLookup = dimSelector.idLookup();
this.indexIntsForRow = new SingleIndexInts();
if (allowSet != null && !allowSet.isEmpty() && idLookup != null) {
for (String s : allowSet) {
if (idLookup.lookupId(s) >= 0) {
allowedBitSet.set(idLookup.lookupId(s));
}
}
}
if (dimSelector.getObject() != null) {
this.indexedIntsForCurrentRow = dimSelector.getRow();
}
if (!allowedBitSet.isEmpty()) {
if (!allowedBitSet.get(indexedIntsForCurrentRow.get(index))) {
advance();
}
}
needInitialization = false;
}
/**
* This advances the cursor to move to the next element to be unnested.
* When the last element in a row is unnested, it is also responsible
* to move the base cursor to the next row for unnesting and repopulates
* the data structures, created during initialize(), to point to the new row
*/
private void advanceAndUpdate()
{
if (indexedIntsForCurrentRow == null) {
index = 0;
if (!baseCursor.isDone()) {
baseCursor.advanceUninterruptibly();
}
} else {
if (index >= indexedIntsForCurrentRow.size() - 1) {
if (!baseCursor.isDone()) {
baseCursor.advanceUninterruptibly();
}
if (!baseCursor.isDone()) {
indexedIntsForCurrentRow = dimSelector.getRow();
}
index = 0;
} else {
++index;
}
}
}
/**
* This advances the unnest cursor in cases where an allowList is specified
* and the current value at the unnest cursor is not in the allowList.
* The cursor in such cases is moved till the next match is found.
*
* @return a boolean to indicate whether to stay or move cursor
*/
private boolean matchAndProceed()
{
boolean matchStatus;
if ((allowSet == null || allowSet.isEmpty()) && allowedBitSet.isEmpty()) {
matchStatus = true;
} else {
matchStatus = allowedBitSet.get(indexedIntsForCurrentRow.get(index));
}
return !baseCursor.isDone() && !matchStatus;
}
// Helper class to help in returning
// getRow from the dimensionSelector
// This is set in the initialize method
private class SingleIndexInts implements IndexedInts
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
//nothing to inspect
}
@Override
public int size()
{
// After unnest each row will have a single element
return 1;
}
@Override
public int get(int idx)
{
return indexedIntsForCurrentRow.get(index);
}
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.Optional;
/**
* The segment reference for the Unnest Data Source.
* The input column name, output name and the allowSet follow from {@link org.apache.druid.query.UnnestDataSource}
*/
public class UnnestSegmentReference implements SegmentReference
{
private static final Logger log = new Logger(UnnestSegmentReference.class);
private final SegmentReference baseSegment;
private final String dimension;
private final String renamedOutputDimension;
private final LinkedHashSet<String> allowSet;
public UnnestSegmentReference(SegmentReference baseSegment, String dimension, String outputName, LinkedHashSet<String> allowList)
{
this.baseSegment = baseSegment;
this.dimension = dimension;
this.renamedOutputDimension = outputName;
this.allowSet = allowList;
}
@Override
public Optional<Closeable> acquireReferences()
{
Closer closer = Closer.create();
try {
boolean acquireFailed = baseSegment.acquireReferences().map(closeable -> {
closer.register(closeable);
return false;
}).orElse(true);
if (acquireFailed) {
CloseableUtils.closeAndWrapExceptions(closer);
return Optional.empty();
} else {
return Optional.of(closer);
}
}
catch (Throwable e) {
// acquireReferences is not permitted to throw exceptions.
CloseableUtils.closeAndSuppressExceptions(closer, e::addSuppressed);
log.warn(e, "Exception encountered while trying to acquire reference");
return Optional.empty();
}
}
@Override
public SegmentId getId()
{
return baseSegment.getId();
}
@Override
public Interval getDataInterval()
{
return baseSegment.getDataInterval();
}
@Nullable
@Override
public QueryableIndex asQueryableIndex()
{
return null;
}
@Override
public StorageAdapter asStorageAdapter()
{
return new UnnestStorageAdapter(
baseSegment.asStorageAdapter(),
dimension,
renamedOutputDimension,
allowSet
);
}
@Override
public void close() throws IOException
{
baseSegment.close();
}
}

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.filter.AndFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Objects;
/**
* This class serves as the Storage Adapter for the Unnest Segment and is responsible for creating the cursors
* If the column is dictionary encoded it creates {@link UnnestDimensionCursor} else {@link UnnestColumnValueSelectorCursor}
* These cursors help navigate the segments for these cases
*/
public class UnnestStorageAdapter implements StorageAdapter
{
private final StorageAdapter baseAdapter;
private final String dimensionToUnnest;
private final String outputColumnName;
private final LinkedHashSet<String> allowSet;
public UnnestStorageAdapter(
final StorageAdapter baseAdapter,
final String dimension,
final String outputColumnName,
final LinkedHashSet<String> allowSet
)
{
this.baseAdapter = baseAdapter;
this.dimensionToUnnest = dimension;
this.outputColumnName = outputColumnName;
this.allowSet = allowSet;
}
@Override
public Sequence<Cursor> makeCursors(
@Nullable Filter filter,
Interval interval,
VirtualColumns virtualColumns,
Granularity gran,
boolean descending,
@Nullable QueryMetrics<?> queryMetrics
)
{
Filter updatedFilter;
if (allowSet != null && !allowSet.isEmpty()) {
final InDimFilter allowListFilters;
allowListFilters = new InDimFilter(dimensionToUnnest, allowSet);
if (filter != null) {
updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters));
} else {
updatedFilter = allowListFilters;
}
} else {
updatedFilter = filter;
}
final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors(
updatedFilter,
interval,
virtualColumns,
gran,
descending,
queryMetrics
);
return Sequences.map(
baseCursorSequence,
cursor -> {
Objects.requireNonNull(cursor);
Cursor retVal = cursor;
ColumnCapabilities capabilities = cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest);
if (capabilities != null) {
if (capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue()) {
retVal = new UnnestDimensionCursor(
retVal,
retVal.getColumnSelectorFactory(),
dimensionToUnnest,
outputColumnName,
allowSet
);
} else {
retVal = new UnnestColumnValueSelectorCursor(
retVal,
retVal.getColumnSelectorFactory(),
dimensionToUnnest,
outputColumnName,
allowSet
);
}
} else {
retVal = new UnnestColumnValueSelectorCursor(
retVal,
retVal.getColumnSelectorFactory(),
dimensionToUnnest,
outputColumnName,
allowSet
);
}
return retVal;
}
);
}
@Override
public Interval getInterval()
{
return baseAdapter.getInterval();
}
@Override
public Indexed<String> getAvailableDimensions()
{
final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>();
for (String dim : baseAdapter.getAvailableDimensions()) {
availableDimensions.add(dim);
}
availableDimensions.add(outputColumnName);
return new ListIndexed<>(Lists.newArrayList(availableDimensions));
}
@Override
public Iterable<String> getAvailableMetrics()
{
return baseAdapter.getAvailableMetrics();
}
@Override
public int getDimensionCardinality(String column)
{
if (!outputColumnName.equals(column)) {
return baseAdapter.getDimensionCardinality(column);
}
return baseAdapter.getDimensionCardinality(dimensionToUnnest);
}
@Override
public DateTime getMinTime()
{
return baseAdapter.getMinTime();
}
@Override
public DateTime getMaxTime()
{
return baseAdapter.getMaxTime();
}
@Nullable
@Override
public Comparable getMinValue(String column)
{
if (!outputColumnName.equals(column)) {
return baseAdapter.getMinValue(column);
}
return baseAdapter.getMinValue(dimensionToUnnest);
}
@Nullable
@Override
public Comparable getMaxValue(String column)
{
if (!outputColumnName.equals(column)) {
return baseAdapter.getMaxValue(column);
}
return baseAdapter.getMaxValue(dimensionToUnnest);
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
if (!outputColumnName.equals(column)) {
return baseAdapter.getColumnCapabilities(column);
}
return baseAdapter.getColumnCapabilities(dimensionToUnnest);
}
@Override
public int getNumRows()
{
return 0;
}
@Override
public DateTime getMaxIngestedEventTime()
{
return baseAdapter.getMaxIngestedEventTime();
}
@Nullable
@Override
public Metadata getMetadata()
{
return baseAdapter.getMetadata();
}
public String getDimensionToUnnest()
{
return dimensionToUnnest;
}
}

View File

@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.base.Predicate;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.ValueMatcher;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.List;
/**
* A Cursor that iterates over a user created list.
* This is used to test the base cursor of an UnnestCursor.
* Usages can be found in tests of {@link UnnestColumnValueSelectorCursor} in {@link UnnestColumnValueSelectorCursorTest}
* However this cannot help with {@link UnnestDimensionCursor}.
* Tests for {@link UnnestDimensionCursor} are done alongside tests for {@link UnnestStorageAdapterTest}
*/
public class ListCursor implements Cursor
{
List<Object> baseList;
private int index;
public ListCursor(List<Object> inputList)
{
this.baseList = inputList;
}
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
return new ColumnSelectorFactory()
{
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
{
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
return null;
}
@Override
public ValueMatcher makeValueMatcher(@Nullable String value)
{
return null;
}
@Override
public ValueMatcher makeValueMatcher(Predicate<String> predicate)
{
return null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
@Nullable
@Override
public Object getObject()
{
if (index < baseList.size()) {
return baseList.get(index);
}
return null;
}
@Override
public Class<?> classOfObject()
{
return null;
}
@Override
public int getValueCardinality()
{
return 0;
}
@Nullable
@Override
public String lookupName(int id)
{
return null;
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return false;
}
@Nullable
@Override
public IdLookup idLookup()
{
return null;
}
};
}
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
{
return new ColumnValueSelector()
{
@Override
public double getDouble()
{
return 0;
}
@Override
public float getFloat()
{
return 0;
}
@Override
public long getLong()
{
return 0;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
@Override
public boolean isNull()
{
return false;
}
@Nullable
@Override
public Object getObject()
{
if (index < baseList.size()) {
return baseList.get(index);
}
return null;
}
@Override
public Class classOfObject()
{
return null;
}
};
}
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return null;
}
};
}
@Override
public DateTime getTime()
{
return null;
}
@Override
public void advance()
{
advanceUninterruptibly();
BaseQuery.checkInterrupted();
}
@Override
public void advanceUninterruptibly()
{
index++;
}
@Override
public boolean isDone()
{
return index > baseList.size() - 1;
}
@Override
public boolean isDoneOrInterrupted()
{
return false;
}
@Override
public void reset()
{
index = 0;
}
}

View File

@ -0,0 +1,632 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandlingTest
{
private static String OUTPUT_NAME = "unnested-column";
private static LinkedHashSet<String> IGNORE_SET = null;
private static LinkedHashSet<String> IGNORE_SET1 = new LinkedHashSet<>(Arrays.asList("b", "f"));
@Test
public void test_list_unnest_cursors()
{
ArrayList<Object> baseList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
List<Object> newList = new ArrayList<>();
for (int j = 0; j < 2; j++) {
newList.add(String.valueOf(i * 2 + j));
}
baseList.add(newList);
}
ListCursor listCursor = new ListCursor(baseList);
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int j = 0;
while (!unnestCursor.isDone()) {
Object colSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(colSelectorVal.toString(), String.valueOf(j));
j++;
unnestCursor.advance();
}
Assert.assertEquals(j, 4);
}
@Test
public void test_list_unnest_cursors_user_supplied_list()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("e", "f", "g", "h", "i"),
Collections.singletonList("j")
);
List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 9);
}
@Test
public void test_list_unnest_cursors_user_supplied_list_only_nulls()
{
List<Object> inputList = Arrays.asList(
Collections.singletonList(null),
Arrays.asList(null, null),
Collections.singletonList(null)
);
List<String> expectedResults = Arrays.asList(null, null, null, null);
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertNull(valueSelectorVal);
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 4);
}
@Test
public void test_list_unnest_cursors_user_supplied_list_mixed_with_nulls()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b"),
Arrays.asList("b", "c"),
"d",
null,
null,
null
);
List<String> expectedResults = Arrays.asList("a", "b", "b", "c", "d", null, null, null);
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
if (valueSelectorVal == null) {
Assert.assertEquals(null, expectedResults.get(k));
} else {
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
}
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 8);
}
@Test
public void test_list_unnest_cursors_user_supplied_strings_and_no_lists()
{
List<Object> inputList = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j");
List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 9);
}
@Test
public void test_list_unnest_cursors_user_supplied_strings_mixed_with_list()
{
List<Object> inputList = Arrays.asList("a", "b", "c", "e", "f", Arrays.asList("g", "h"), "i", "j");
List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 9);
}
@Test
public void test_list_unnest_cursors_user_supplied_lists_three_levels()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("e", "f", "g", "h", "i"),
Arrays.asList("j", Arrays.asList("a", "b"))
);
List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j", Arrays.asList("a", "b"));
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString());
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 10);
}
@Test
public void test_list_unnest_of_unnest_cursors_user_supplied_list_three_levels()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("e", "f", "g", "h", "i"),
Arrays.asList("j", Arrays.asList("a", "b"))
);
List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j", "a", "b");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor childCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
UnnestColumnValueSelectorCursor parentCursor = new UnnestColumnValueSelectorCursor(
childCursor,
childCursor.getColumnSelectorFactory(),
OUTPUT_NAME,
"tmp-out",
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = parentCursor.getColumnSelectorFactory()
.makeColumnValueSelector("tmp-out");
int k = 0;
while (!parentCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString());
k++;
parentCursor.advance();
}
Assert.assertEquals(k, 11);
}
@Test
public void test_list_unnest_cursors_user_supplied_list_with_nulls()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("e", "f", "g", "h", "i", null),
Collections.singletonList("j")
);
List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", null, "j");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
if (valueSelectorVal == null) {
Assert.assertEquals(null, expectedResults.get(k));
} else {
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
}
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, expectedResults.size());
}
@Test
public void test_list_unnest_cursors_user_supplied_list_with_dups()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "a", "a"),
Arrays.asList("e", "f", null, "h", "i", null),
Collections.singletonList("j")
);
List<Object> expectedResults = Arrays.asList("a", "a", "a", "e", "f", null, "h", "i", null, "j");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
if (valueSelectorVal == null) {
Assert.assertEquals(null, expectedResults.get(k));
} else {
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
}
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 10);
}
@Test
public void test_list_unnest_cursors_user_supplied_list_with_ignore_set()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("e", "f", "g", "h", "i"),
Collections.singletonList("j")
);
List<String> expectedResults = Arrays.asList("b", "f");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET1
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
if (valueSelectorVal == null) {
Assert.assertEquals(null, expectedResults.get(k));
} else {
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k));
}
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 2);
}
@Test
public void test_list_unnest_cursors_user_supplied_list_double()
{
List<Object> inputList = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6, 7, 8),
Collections.singletonList(9)
);
List<Double> expectedResults = Arrays.asList(1d, 2d, 3d, 4d, 5d, 6d, 7d, 8d, 9d);
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Double valueSelectorVal = unnestColumnValueSelector.getDouble();
Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 9);
}
@Test
public void test_list_unnest_cursors_user_supplied_list_float()
{
List<Object> inputList = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6, 7, 8),
Collections.singletonList(9)
);
List<Float> expectedResults = Arrays.asList(1f, 2f, 3f, 4f, 5f, 6f, 7f, 8f, 9f);
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Float valueSelectorVal = unnestColumnValueSelector.getFloat();
Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 9);
}
@Test
public void test_list_unnest_cursors_user_supplied_list_long()
{
List<Object> inputList = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6, 7, 8),
Collections.singletonList(9)
);
List<Long> expectedResults = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object obj = unnestColumnValueSelector.getObject();
Assert.assertNotNull(obj);
Long valueSelectorVal = unnestColumnValueSelector.getLong();
Assert.assertEquals(valueSelectorVal, expectedResults.get(k));
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 9);
}
@Test
public void test_list_unnest_cursors_user_supplied_list_three_level_arrays_and_methods()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("e", "f", "g", "h", "i"),
Arrays.asList("j", Arrays.asList("a", "b"))
);
List<Object> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j", Arrays.asList("a", "b"));
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString());
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 10);
unnestCursor.reset();
Assert.assertFalse(unnestCursor.isDoneOrInterrupted());
}
@Test(expected = UOE.class)
public void test_list_unnest_cursors_dimSelector()
{
List<Object> inputList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("e", "f", "g", "h", "i"),
Collections.singletonList("j")
);
List<String> expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j");
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
unnestCursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_NAME));
}
@Test
public void test_list_unnest_cursors_user_supplied_list_of_integers()
{
List<Object> inputList = Arrays.asList(
Arrays.asList(1, 2, 3),
Arrays.asList(4, 5, 6, 7, 8),
Collections.singletonList(9)
);
List<Integer> expectedResults = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
//Create base cursor
ListCursor listCursor = new ListCursor(inputList);
//Create unnest cursor
UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor(
listCursor,
listCursor.getColumnSelectorFactory(),
"dummy",
OUTPUT_NAME,
IGNORE_SET
);
ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory()
.makeColumnValueSelector(OUTPUT_NAME);
int k = 0;
while (!unnestCursor.isDone()) {
Object valueSelectorVal = unnestColumnValueSelector.getObject();
Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString());
k++;
unnestCursor.advance();
}
Assert.assertEquals(k, 9);
}
}

View File

@ -0,0 +1,399 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.generator.SegmentGenerator;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.utils.CloseableUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
public class UnnestStorageAdapterTest extends InitializedNullHandlingTest
{
private static Closer CLOSER;
private static IncrementalIndex INCREMENTAL_INDEX;
private static IncrementalIndexStorageAdapter INCREMENTAL_INDEX_STORAGE_ADAPTER;
private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER;
private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER1;
private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER2;
private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER3;
private static List<StorageAdapter> ADAPTERS;
private static String COLUMNNAME = "multi-string1";
private static String OUTPUT_COLUMN_NAME = "unnested-multi-string1";
private static String OUTPUT_COLUMN_NAME1 = "unnested-multi-string1-again";
private static LinkedHashSet<String> IGNORE_SET = new LinkedHashSet<>(Arrays.asList("1", "3", "5"));
@BeforeClass
public static void setup()
{
CLOSER = Closer.create();
final GeneratorSchemaInfo schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench");
final DataSegment dataSegment = DataSegment.builder()
.dataSource("foo")
.interval(schemaInfo.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
final SegmentGenerator segmentGenerator = CLOSER.register(new SegmentGenerator());
final int numRows = 2;
INCREMENTAL_INDEX = CLOSER.register(
segmentGenerator.generateIncrementalIndex(dataSegment, schemaInfo, Granularities.HOUR, numRows)
);
INCREMENTAL_INDEX_STORAGE_ADAPTER = new IncrementalIndexStorageAdapter(INCREMENTAL_INDEX);
UNNEST_STORAGE_ADAPTER = new UnnestStorageAdapter(
INCREMENTAL_INDEX_STORAGE_ADAPTER,
COLUMNNAME,
OUTPUT_COLUMN_NAME,
null
);
UNNEST_STORAGE_ADAPTER1 = new UnnestStorageAdapter(
INCREMENTAL_INDEX_STORAGE_ADAPTER,
COLUMNNAME,
OUTPUT_COLUMN_NAME,
IGNORE_SET
);
UNNEST_STORAGE_ADAPTER2 = new UnnestStorageAdapter(
UNNEST_STORAGE_ADAPTER,
COLUMNNAME,
OUTPUT_COLUMN_NAME1,
null
);
UNNEST_STORAGE_ADAPTER3 = new UnnestStorageAdapter(
UNNEST_STORAGE_ADAPTER1,
COLUMNNAME,
OUTPUT_COLUMN_NAME1,
IGNORE_SET
);
ADAPTERS = ImmutableList.of(
UNNEST_STORAGE_ADAPTER,
UNNEST_STORAGE_ADAPTER1,
UNNEST_STORAGE_ADAPTER2,
UNNEST_STORAGE_ADAPTER3
);
}
@AfterClass
public static void teardown()
{
CloseableUtils.closeAndSuppressExceptions(CLOSER, throwable -> {
});
}
@Test
public void test_group_of_unnest_adapters_methods()
{
String colName = "multi-string1";
for (StorageAdapter adapter : ADAPTERS) {
Assert.assertEquals(
DateTimes.of("2000-01-01T23:00:00.000Z"),
adapter.getMaxTime()
);
Assert.assertEquals(
DateTimes.of("2000-01-01T12:00:00.000Z"),
adapter.getMinTime()
);
adapter.getColumnCapabilities(colName);
Assert.assertEquals(adapter.getNumRows(), 0);
Assert.assertNotNull(adapter.getMetadata());
Assert.assertEquals(
DateTimes.of("2000-01-01T23:59:59.999Z"),
adapter.getMaxIngestedEventTime()
);
Assert.assertEquals(
adapter.getColumnCapabilities(colName).toColumnType(),
INCREMENTAL_INDEX_STORAGE_ADAPTER.getColumnCapabilities(colName).toColumnType()
);
Assert.assertEquals(((UnnestStorageAdapter) adapter).getDimensionToUnnest(), colName);
}
}
@Test
public void test_group_of_unnest_adapters_column_capabilities()
{
String colName = "multi-string1";
List<String> columnsInTable = Arrays.asList(
"string1",
"long1",
"double1",
"float1",
"multi-string1",
OUTPUT_COLUMN_NAME
);
List<ValueType> valueTypes = Arrays.asList(
ValueType.STRING,
ValueType.LONG,
ValueType.DOUBLE,
ValueType.FLOAT,
ValueType.STRING,
ValueType.STRING
);
UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER;
for (int i = 0; i < columnsInTable.size(); i++) {
ColumnCapabilities capabilities = adapter.getColumnCapabilities(columnsInTable.get(i));
Assert.assertEquals(capabilities.getType(), valueTypes.get(i));
}
Assert.assertEquals(adapter.getDimensionToUnnest(), colName);
}
@Test
public void test_unnest_adapters_basic()
{
Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER.makeCursors(
null,
UNNEST_STORAGE_ADAPTER.getInterval(),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
cursorSequence.accumulate(null, (accumulated, cursor) -> {
ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
int count = 0;
while (!cursor.isDone()) {
Object dimSelectorVal = dimSelector.getObject();
if (dimSelectorVal == null) {
Assert.assertNull(dimSelectorVal);
}
cursor.advance();
count++;
}
/*
each row has 8 entries.
unnest 2 rows -> 16 rows after unnest
*/
Assert.assertEquals(count, 16);
return null;
});
}
@Test
public void test_two_levels_of_unnest_adapters()
{
Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER2.makeCursors(
null,
UNNEST_STORAGE_ADAPTER2.getInterval(),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
cursorSequence.accumulate(null, (accumulated, cursor) -> {
ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME1));
ColumnValueSelector valueSelector = factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME1);
int count = 0;
while (!cursor.isDone()) {
Object dimSelectorVal = dimSelector.getObject();
Object valueSelectorVal = valueSelector.getObject();
if (dimSelectorVal == null) {
Assert.assertNull(dimSelectorVal);
} else if (valueSelectorVal == null) {
Assert.assertNull(valueSelectorVal);
}
cursor.advance();
count++;
}
/*
each row has 8 entries.
unnest 2 rows -> 16 entries also the value cardinality
unnest of unnest -> 16*8 = 128 rows
*/
Assert.assertEquals(count, 128);
Assert.assertEquals(dimSelector.getValueCardinality(), 16);
return null;
});
}
@Test
public void test_unnest_adapters_with_allowList()
{
final String columnName = "multi-string1";
Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
null,
UNNEST_STORAGE_ADAPTER1.getInterval(),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
cursorSequence.accumulate(null, (accumulated, cursor) -> {
ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
ColumnValueSelector valueSelector = factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME);
int count = 0;
while (!cursor.isDone()) {
Object dimSelectorVal = dimSelector.getObject();
Object valueSelectorVal = valueSelector.getObject();
if (dimSelectorVal == null) {
Assert.assertNull(dimSelectorVal);
} else if (valueSelectorVal == null) {
Assert.assertNull(valueSelectorVal);
}
cursor.advance();
count++;
}
/*
each row has 8 distinct entries.
allowlist has 3 entries also the value cardinality
unnest will have 3 distinct entries
*/
Assert.assertEquals(count, 3);
Assert.assertEquals(dimSelector.getValueCardinality(), 3);
return null;
});
}
@Test
public void test_two_levels_of_unnest_adapters_with_allowList()
{
final String columnName = "multi-string1";
Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER3.makeCursors(
null,
UNNEST_STORAGE_ADAPTER3.getInterval(),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER3;
Assert.assertEquals(adapter.getDimensionToUnnest(), columnName);
Assert.assertEquals(
adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(),
ColumnCapabilities.Capable.TRUE
);
Assert.assertEquals(adapter.getMaxValue(columnName), adapter.getMaxValue(OUTPUT_COLUMN_NAME));
Assert.assertEquals(adapter.getMinValue(columnName), adapter.getMinValue(OUTPUT_COLUMN_NAME));
cursorSequence.accumulate(null, (accumulated, cursor) -> {
ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME1));
ColumnValueSelector valueSelector = factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME1);
int count = 0;
while (!cursor.isDone()) {
Object dimSelectorVal = dimSelector.getObject();
Object valueSelectorVal = valueSelector.getObject();
if (dimSelectorVal == null) {
Assert.assertNull(dimSelectorVal);
} else if (valueSelectorVal == null) {
Assert.assertNull(valueSelectorVal);
}
cursor.advance();
count++;
}
/*
each row has 8 distinct entries.
allowlist has 3 entries also the value cardinality
unnest will have 3 distinct entries
unnest of that unnest will have 3*3 = 9 entries
*/
Assert.assertEquals(count, 9);
Assert.assertEquals(dimSelector.getValueCardinality(), 3);
return null;
});
}
@Test
public void test_unnest_adapters_methods_with_allowList()
{
final String columnName = "multi-string1";
Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
null,
UNNEST_STORAGE_ADAPTER1.getInterval(),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
);
UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER1;
Assert.assertEquals(adapter.getDimensionToUnnest(), columnName);
Assert.assertEquals(
adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(),
ColumnCapabilities.Capable.TRUE
);
Assert.assertEquals(adapter.getMaxValue(columnName), adapter.getMaxValue(OUTPUT_COLUMN_NAME));
Assert.assertEquals(adapter.getMinValue(columnName), adapter.getMinValue(OUTPUT_COLUMN_NAME));
cursorSequence.accumulate(null, (accumulated, cursor) -> {
ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
IdLookup idlookUp = dimSelector.idLookup();
Assert.assertFalse(dimSelector.isNull());
int[] indices = new int[]{1, 3, 5};
int count = 0;
while (!cursor.isDone()) {
Object dimSelectorVal = dimSelector.getObject();
Assert.assertEquals(idlookUp.lookupId((String) dimSelectorVal), indices[count]);
// after unnest first entry in get row should equal the object
// and the row size will always be 1
Assert.assertEquals(dimSelector.getRow().get(0), indices[count]);
Assert.assertEquals(dimSelector.getRow().size(), 1);
Assert.assertNotNull(dimSelector.makeValueMatcher(OUTPUT_COLUMN_NAME));
cursor.advance();
count++;
}
Assert.assertEquals(dimSelector.getValueCardinality(), 3);
Assert.assertEquals(count, 3);
return null;
});
}
}