From 91774196285ea51b823883394ef1bfe8f2417892 Mon Sep 17 00:00:00 2001 From: somu-imply <93540295+somu-imply@users.noreply.github.com> Date: Fri, 2 Dec 2022 18:48:25 -0800 Subject: [PATCH] Unnest functionality for Druid (#13268) * Moving all unnest cursor code atop refactored code for unnest * Updating unnest cursor * Removing dedup and fixing up some null checks * AllowList changes * Fixing some NPEs * Using bitset for allowlist * Updating the initialization only when cursor is in non-done state * Updating code to skip rows not in allow list * Adding a flag for cases when first element is not in allowed list * Updating for a null in allowList * Splitting unnest cursor into 2 subclasses * Intercepting some apis with columnName for new unnested column * Adding test cases and renaming some stuff * checkstyle fixes * Moving to an interface for Unnest * handling null rows in a dimension * Updating cursors after comments part-1 * Addressing comments and adding some more tests * Reverting a change to ScanQueryRunner and improving a comment * removing an unused function * Updating cursors after comments part 2 * One last fix for review comments * Making some functions private, deleting some comments, adding a test for unnest of unnest with allowList * Adding an exception for a case * Closure for unnest data source * Adding some javadocs * One minor change in makeDimSelector of columnarCursor * Updating an error message * Update processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> * Unnesting on virtual columns was missing an object array, adding that to support virtual columns unnesting * Updating exceptions to use UOE * Renamed files, added column capability test on adapter, return statement and made unnest datasource not cacheable for the time being * Handling for null values in dim selector * Fixing a NPE for null row * Updating capabilities * Updating capabilities Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> --- .../org/apache/druid/query/DataSource.java | 3 +- .../apache/druid/query/UnnestDataSource.java | 212 ++++++ .../query/planning/DataSourceAnalysis.java | 39 +- .../UnnestColumnValueSelectorCursor.java | 336 ++++++++++ .../druid/segment/UnnestDimensionCursor.java | 415 ++++++++++++ .../druid/segment/UnnestSegmentReference.java | 115 ++++ .../druid/segment/UnnestStorageAdapter.java | 234 +++++++ .../org/apache/druid/segment/ListCursor.java | 228 +++++++ .../UnnestColumnValueSelectorCursorTest.java | 632 ++++++++++++++++++ .../segment/UnnestStorageAdapterTest.java | 399 +++++++++++ 10 files changed, 2602 insertions(+), 11 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/UnnestDataSource.java create mode 100644 processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java create mode 100644 processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java create mode 100644 processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java create mode 100644 processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java create mode 100644 processing/src/test/java/org/apache/druid/segment/ListCursor.java create mode 100644 processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java create mode 100644 processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index f56a3550a34..43dfb3be85c 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -41,7 +41,8 @@ import java.util.function.Function; @JsonSubTypes.Type(value = JoinDataSource.class, name = "join"), @JsonSubTypes.Type(value = LookupDataSource.class, name = "lookup"), @JsonSubTypes.Type(value = InlineDataSource.class, name = "inline"), - @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable") + @JsonSubTypes.Type(value = GlobalTableDataSource.class, name = "globalTable"), + @JsonSubTypes.Type(value = UnnestDataSource.class, name = "unnest") }) public interface DataSource { diff --git a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java new file mode 100644 index 00000000000..46237016743 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.UnnestSegmentReference; +import org.apache.druid.utils.JvmUtils; + +import javax.annotation.Nullable; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +/** + * The data source for representing an unnest operation. + * + * An unnest data source has the following: + * a base data source which is to be unnested + * the column name of the MVD which will be unnested + * the name of the column that will hold the unnested values + * and an allowlist serving as a filter of which values in the MVD will be unnested. + */ +public class UnnestDataSource implements DataSource +{ + private final DataSource base; + private final String column; + private final String outputName; + private final LinkedHashSet allowList; + + private UnnestDataSource( + DataSource dataSource, + String columnName, + String outputName, + LinkedHashSet allowList + ) + { + this.base = dataSource; + this.column = columnName; + this.outputName = outputName; + this.allowList = allowList; + } + + @JsonCreator + public static UnnestDataSource create( + @JsonProperty("base") DataSource base, + @JsonProperty("column") String columnName, + @JsonProperty("outputName") String outputName, + @Nullable @JsonProperty("allowList") LinkedHashSet allowList + ) + { + return new UnnestDataSource(base, columnName, outputName, allowList); + } + + @JsonProperty("base") + public DataSource getBase() + { + return base; + } + + @JsonProperty("column") + public String getColumn() + { + return column; + } + + @JsonProperty("outputName") + public String getOutputName() + { + return outputName; + } + + @JsonProperty("allowList") + public LinkedHashSet getAllowList() + { + return allowList; + } + + @Override + public Set getTableNames() + { + return base.getTableNames(); + } + + @Override + public List getChildren() + { + return ImmutableList.of(base); + } + + @Override + public DataSource withChildren(List children) + { + if (children.size() != 1) { + throw new IAE("Expected [1] child, got [%d]", children.size()); + } + return new UnnestDataSource(children.get(0), column, outputName, allowList); + } + + @Override + public boolean isCacheable(boolean isBroker) + { + return false; + } + + @Override + public boolean isGlobal() + { + return base.isGlobal(); + } + + @Override + public boolean isConcrete() + { + return base.isConcrete(); + } + + @Override + public Function createSegmentMapFunction( + Query query, + AtomicLong cpuTimeAccumulator + ) + { + final Function segmentMapFn = base.createSegmentMapFunction( + query, + cpuTimeAccumulator + ); + return JvmUtils.safeAccumulateThreadCpuTime( + cpuTimeAccumulator, + () -> { + if (column == null) { + return segmentMapFn; + } else if (column.isEmpty()) { + return segmentMapFn; + } else { + return + baseSegment -> + new UnnestSegmentReference( + segmentMapFn.apply(baseSegment), + column, + outputName, + allowList + ); + } + } + ); + + } + + @Override + public DataSource withUpdatedDataSource(DataSource newSource) + { + return new UnnestDataSource(newSource, column, outputName, allowList); + } + + @Override + public byte[] getCacheKey() + { + // The column being unnested would need to be part of the cache key + // as the results are dependent on what column is being unnested. + // Currently, it is not cacheable. + // Future development should use the table name and column came to + // create an appropriate cac + return null; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UnnestDataSource that = (UnnestDataSource) o; + return column.equals(that.column) + && outputName.equals(that.outputName) + && base.equals(that.base); + } + + @Override + public int hashCode() + { + return Objects.hash(base, column, outputName); + } +} + + diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java index c329e3a5708..63c2c8b815d 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -28,6 +28,7 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.UnnestDataSource; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.spec.QuerySegmentSpec; @@ -112,17 +113,29 @@ public class DataSourceAnalysis Query baseQuery = null; DataSource current = dataSource; - while (current instanceof QueryDataSource) { - final Query subQuery = ((QueryDataSource) current).getQuery(); + // This needs to be an or condition between QueryDataSource and UnnestDataSource + // As queries can have interleaving query and unnest data sources. + // Ideally if each data source generate their own analysis object we can avoid the or here + // and have cleaner code. Especially as we increase the types of data sources in future + // these or checks will be tedious. Future development should move forDataSource method + // into each data source. - if (!(subQuery instanceof BaseQuery)) { - // We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec" - // work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries. - throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName()); + while (current instanceof QueryDataSource || current instanceof UnnestDataSource) { + if (current instanceof QueryDataSource) { + final Query subQuery = ((QueryDataSource) current).getQuery(); + + if (!(subQuery instanceof BaseQuery)) { + // We must verify that the subQuery is a BaseQuery, because it is required to make "getBaseQuerySegmentSpec" + // work properly. All built-in query types are BaseQuery, so we only expect this with funky extension queries. + throw new IAE("Cannot analyze subquery of class[%s]", subQuery.getClass().getName()); + } + + baseQuery = subQuery; + current = subQuery.getDataSource(); + } else { + final UnnestDataSource unnestDataSource = (UnnestDataSource) current; + current = unnestDataSource.getBase(); } - - baseQuery = subQuery; - current = subQuery.getDataSource(); } if (current instanceof JoinDataSource) { @@ -276,7 +289,8 @@ public class DataSourceAnalysis /** * Returns true if this datasource is concrete-based (see {@link #isConcreteBased()}, and the base datasource is a - * {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource}. This is an + * {@link TableDataSource} or a {@link UnionDataSource} composed entirely of {@link TableDataSource} + * or an {@link UnnestDataSource} composed entirely of {@link TableDataSource} . This is an * important property, because it corresponds to datasources that can be handled by Druid's distributed query stack. */ public boolean isConcreteTableBased() @@ -286,6 +300,10 @@ public class DataSourceAnalysis // so check anyway for future-proofing. return isConcreteBased() && (baseDataSource instanceof TableDataSource || (baseDataSource instanceof UnionDataSource && + baseDataSource.getChildren() + .stream() + .allMatch(ds -> ds instanceof TableDataSource)) + || (baseDataSource instanceof UnnestDataSource && baseDataSource.getChildren() .stream() .allMatch(ds -> ds instanceof TableDataSource))); @@ -298,6 +316,7 @@ public class DataSourceAnalysis { return dataSource instanceof QueryDataSource; } + /** * Returns true if this datasource is made out of a join operation diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java b/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java new file mode 100644 index 00000000000..db4acb893ea --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/UnnestColumnValueSelectorCursor.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; + +/** + * The cursor to help unnest MVDs without dictionary encoding and ARRAY type selectors. + *

+ * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'e'] + *

+ * The baseCursor points to the row ['a', 'b', 'c'] + * while the unnestCursor with each call of advance() moves over individual elements. + *

+ * unnestCursor.advance() -> 'a' + * unnestCursor.advance() -> 'b' + * unnestCursor.advance() -> 'c' + * unnestCursor.advance() -> 'd' (advances base cursor first) + * unnestCursor.advance() -> 'e' + *

+ *

+ * The allowSet if available helps skip over elements which are not in the allowList by moving the cursor to + * the next available match. + *

+ * The index reference points to the index of each row that the unnest cursor is accessing through currentVal + * The index ranges from 0 to the size of the list in each row which is held in the unnestListForCurrentRow + *

+ * The needInitialization flag sets up the initial values of unnestListForCurrentRow at the beginning of the segment + */ +public class UnnestColumnValueSelectorCursor implements Cursor +{ + private final Cursor baseCursor; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private final ColumnValueSelector columnValueSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet allowSet; + private int index; + private Object currentVal; + private List unnestListForCurrentRow; + private boolean needInitialization; + + public UnnestColumnValueSelectorCursor( + Cursor cursor, + ColumnSelectorFactory baseColumSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = baseColumSelectorFactory; + this.columnValueSelector = this.baseColumnSelectorFactory.makeColumnValueSelector(columnName); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + throw new UOE("Unsupported dimension selector while using column value selector for column [%s]", outputName); + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumnSelectorFactory.makeColumnValueSelector(columnName); + } + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + Object value = getObject(); + if (value == null) { + return 0; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + throw new UOE("Cannot convert object to double"); + } + + @Override + public float getFloat() + { + Object value = getObject(); + if (value == null) { + return 0; + } + if (value instanceof Number) { + return ((Number) value).floatValue(); + } + throw new UOE("Cannot convert object to float"); + } + + @Override + public long getLong() + { + Object value = getObject(); + if (value == null) { + return 0; + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + throw new UOE("Cannot convert object to long"); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + columnValueSelector.inspectRuntimeShape(inspector); + } + + @Override + public boolean isNull() + { + return getObject() == null; + } + + @Nullable + @Override + public Object getObject() + { + if (!unnestListForCurrentRow.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return unnestListForCurrentRow.get(index); + } else if (allowSet.contains((String) unnestListForCurrentRow.get(index))) { + return unnestListForCurrentRow.get(index); + } + } + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(column)) { + return baseColumnSelectorFactory.getColumnCapabilities(column); + } + final ColumnCapabilities capabilities = baseColumnSelectorFactory.getColumnCapabilities(columnName); + if (capabilities.isArray()) { + return ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType()); + } + if (capabilities.hasMultipleValues().isTrue()) { + return ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false); + } + return baseColumnSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && !baseCursor.isDone()) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && !baseCursor.isDoneOrInterrupted()) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + /** + * This method populates the objects when the base cursor moves to the next row + * + * @param firstRun flag to populate one time object references to hold values for unnest cursor + */ + private void getNextRow(boolean firstRun) + { + currentVal = this.columnValueSelector.getObject(); + if (currentVal == null) { + if (!firstRun) { + unnestListForCurrentRow = new ArrayList<>(); + } + unnestListForCurrentRow.add(null); + } else { + if (currentVal instanceof List) { + unnestListForCurrentRow = (List) currentVal; + } else if (currentVal instanceof Object[]) { + unnestListForCurrentRow = Arrays.asList((Object[]) currentVal); + } else if (currentVal.getClass().equals(String.class)) { + if (!firstRun) { + unnestListForCurrentRow = new ArrayList<>(); + } + unnestListForCurrentRow.add(currentVal); + } + } + } + + /** + * This initializes the unnest cursor and creates data structures + * to start iterating over the values to be unnested. + * This would also create a bitset for dictonary encoded columns to + * check for matching values specified in allowedList of UnnestDataSource. + */ + private void initialize() + { + this.unnestListForCurrentRow = new ArrayList<>(); + getNextRow(needInitialization); + if (allowSet != null) { + if (!allowSet.isEmpty()) { + if (!allowSet.contains((String) unnestListForCurrentRow.get(index))) { + advance(); + } + } + } + needInitialization = false; + } + + /** + * This advances the cursor to move to the next element to be unnested. + * When the last element in a row is unnested, it is also responsible + * to move the base cursor to the next row for unnesting and repopulates + * the data structures, created during initialize(), to point to the new row + */ + private void advanceAndUpdate() + { + if (unnestListForCurrentRow.isEmpty() || index >= unnestListForCurrentRow.size() - 1) { + index = 0; + baseCursor.advance(); + if (!baseCursor.isDone()) { + getNextRow(needInitialization); + } + } else { + index++; + } + } + + /** + * This advances the unnest cursor in cases where an allowList is specified + * and the current value at the unnest cursor is not in the allowList. + * The cursor in such cases is moved till the next match is found. + * + * @return a boolean to indicate whether to stay or move cursor + */ + private boolean matchAndProceed() + { + boolean matchStatus; + if (allowSet == null || allowSet.isEmpty()) { + matchStatus = true; + } else { + matchStatus = allowSet.contains((String) unnestListForCurrentRow.get(index)); + } + return !baseCursor.isDone() && !matchStatus; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java new file mode 100644 index 00000000000..46a2c626caf --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/UnnestDimensionCursor.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.BitSet; +import java.util.LinkedHashSet; + +/** + * The cursor to help unnest MVDs with dictionary encoding. + * Consider a segment has 2 rows + * ['a', 'b', 'c'] + * ['d', 'c'] + *

+ * Considering dictionary encoding, these are represented as + *

+ * 'a' -> 0 + * 'b' -> 1 + * 'c' -> 2 + * 'd' -> 3 + *

+ * The baseCursor points to the row of IndexedInts [0, 1, 2] + * while the unnestCursor with each call of advance() moves over individual elements. + *

+ * advance() -> 0 -> 'a' + * advance() -> 1 -> 'b' + * advance() -> 2 -> 'c' + * advance() -> 3 -> 'd' (advances base cursor first) + * advance() -> 2 -> 'c' + *

+ * Total 5 advance calls above + *

+ * The allowSet, if available, helps skip over elements that are not in the allowList by moving the cursor to + * the next available match. The hashSet is converted into a bitset (during initialization) for efficiency. + * If allowSet is ['c', 'd'] then the advance moves over to the next available match + *

+ * advance() -> 2 -> 'c' + * advance() -> 3 -> 'd' (advances base cursor first) + * advance() -> 2 -> 'c' + *

+ * Total 3 advance calls in this case + *

+ * The index reference points to the index of each row that the unnest cursor is accessing + * The indexedInts for each row are held in the indexedIntsForCurrentRow object + *

+ * The needInitialization flag sets up the initial values of indexedIntsForCurrentRow at the beginning of the segment + */ +public class UnnestDimensionCursor implements Cursor +{ + private final Cursor baseCursor; + private final DimensionSelector dimSelector; + private final String columnName; + private final String outputName; + private final LinkedHashSet allowSet; + private final BitSet allowedBitSet; + private final ColumnSelectorFactory baseColumnSelectorFactory; + private int index; + private IndexedInts indexedIntsForCurrentRow; + private boolean needInitialization; + private SingleIndexInts indexIntsForRow; + + public UnnestDimensionCursor( + Cursor cursor, + ColumnSelectorFactory baseColumnSelectorFactory, + String columnName, + String outputColumnName, + LinkedHashSet allowSet + ) + { + this.baseCursor = cursor; + this.baseColumnSelectorFactory = baseColumnSelectorFactory; + this.dimSelector = this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + this.columnName = columnName; + this.index = 0; + this.outputName = outputColumnName; + this.needInitialization = true; + this.allowSet = allowSet; + this.allowedBitSet = new BitSet(); + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + if (!outputName.equals(dimensionSpec.getDimension())) { + return baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec); + } + + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + // This object reference has been created + // during the call to initialize and referenced henceforth + return indexIntsForRow; + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + final int idForLookup = idLookup().lookupId(value); + if (idForLookup < 0) { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return false; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + }; + } + + return new ValueMatcher() + { + @Override + public boolean matches() + { + return idForLookup == indexedIntsForCurrentRow.get(index); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + dimSelector.inspectRuntimeShape(inspector); + } + }; + } + + @Override + public ValueMatcher makeValueMatcher(Predicate predicate) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + dimSelector.inspectRuntimeShape(inspector); + } + + @Nullable + @Override + public Object getObject() + { + if (indexedIntsForCurrentRow == null) { + return null; + } + if (allowedBitSet.isEmpty()) { + if (allowSet == null || allowSet.isEmpty()) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + } else if (allowedBitSet.get(indexedIntsForCurrentRow.get(index))) { + return lookupName(indexedIntsForCurrentRow.get(index)); + } + return null; + } + + @Override + public Class classOfObject() + { + return Object.class; + } + + @Override + public int getValueCardinality() + { + if (!allowedBitSet.isEmpty()) { + return allowedBitSet.cardinality(); + } + return dimSelector.getValueCardinality(); + } + + @Nullable + @Override + public String lookupName(int id) + { + return dimSelector.lookupName(id); + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return dimSelector.nameLookupPossibleInAdvance(); + } + + @Nullable + @Override + public IdLookup idLookup() + { + return dimSelector.idLookup(); + } + }; + } + + /* + This ideally should not be called. If called delegate using the makeDimensionSelector + */ + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + if (!outputName.equals(columnName)) { + return baseColumnSelectorFactory.makeColumnValueSelector(columnName); + } + return makeDimensionSelector(DefaultDimensionSpec.of(columnName)); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputName.equals(column)) { + return baseColumnSelectorFactory.getColumnCapabilities(column); + } + // This currently returns the same type as of the column to be unnested + // This is fine for STRING types + // But going forward if the dimension to be unnested is of type ARRAY, + // this should strip down to the base type of the array + final ColumnCapabilities capabilities = baseColumnSelectorFactory.getColumnCapabilities(columnName); + if (capabilities.isArray()) { + return ColumnCapabilitiesImpl.copyOf(capabilities).setType(capabilities.getElementType()); + } + if (capabilities.hasMultipleValues().isTrue()) { + return ColumnCapabilitiesImpl.copyOf(capabilities).setHasMultipleValues(false); + } + return baseColumnSelectorFactory.getColumnCapabilities(columnName); + } + }; + } + + @Override + public DateTime getTime() + { + return baseCursor.getTime(); + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + do { + advanceAndUpdate(); + } while (matchAndProceed()); + } + + @Override + public boolean isDone() + { + if (needInitialization && !baseCursor.isDone()) { + initialize(); + } + return baseCursor.isDone(); + } + + @Override + public boolean isDoneOrInterrupted() + { + if (needInitialization && !baseCursor.isDoneOrInterrupted()) { + initialize(); + } + return baseCursor.isDoneOrInterrupted(); + } + + @Override + public void reset() + { + index = 0; + needInitialization = true; + baseCursor.reset(); + } + + /** + * This initializes the unnest cursor and creates data structures + * to start iterating over the values to be unnested. + * This would also create a bitset for dictonary encoded columns to + * check for matching values specified in allowedList of UnnestDataSource. + */ + private void initialize() + { + IdLookup idLookup = dimSelector.idLookup(); + this.indexIntsForRow = new SingleIndexInts(); + if (allowSet != null && !allowSet.isEmpty() && idLookup != null) { + for (String s : allowSet) { + if (idLookup.lookupId(s) >= 0) { + allowedBitSet.set(idLookup.lookupId(s)); + } + } + } + if (dimSelector.getObject() != null) { + this.indexedIntsForCurrentRow = dimSelector.getRow(); + } + if (!allowedBitSet.isEmpty()) { + if (!allowedBitSet.get(indexedIntsForCurrentRow.get(index))) { + advance(); + } + } + needInitialization = false; + } + + /** + * This advances the cursor to move to the next element to be unnested. + * When the last element in a row is unnested, it is also responsible + * to move the base cursor to the next row for unnesting and repopulates + * the data structures, created during initialize(), to point to the new row + */ + private void advanceAndUpdate() + { + if (indexedIntsForCurrentRow == null) { + index = 0; + if (!baseCursor.isDone()) { + baseCursor.advanceUninterruptibly(); + } + } else { + if (index >= indexedIntsForCurrentRow.size() - 1) { + if (!baseCursor.isDone()) { + baseCursor.advanceUninterruptibly(); + } + if (!baseCursor.isDone()) { + indexedIntsForCurrentRow = dimSelector.getRow(); + } + index = 0; + } else { + ++index; + } + } + } + + /** + * This advances the unnest cursor in cases where an allowList is specified + * and the current value at the unnest cursor is not in the allowList. + * The cursor in such cases is moved till the next match is found. + * + * @return a boolean to indicate whether to stay or move cursor + */ + private boolean matchAndProceed() + { + boolean matchStatus; + if ((allowSet == null || allowSet.isEmpty()) && allowedBitSet.isEmpty()) { + matchStatus = true; + } else { + matchStatus = allowedBitSet.get(indexedIntsForCurrentRow.get(index)); + } + return !baseCursor.isDone() && !matchStatus; + } + + // Helper class to help in returning + // getRow from the dimensionSelector + // This is set in the initialize method + private class SingleIndexInts implements IndexedInts + { + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + //nothing to inspect + } + + @Override + public int size() + { + // After unnest each row will have a single element + return 1; + } + + @Override + public int get(int idx) + { + return indexedIntsForCurrentRow.get(index); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java b/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java new file mode 100644 index 00000000000..9da6b8132cb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/UnnestSegmentReference.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CloseableUtils; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedHashSet; +import java.util.Optional; + +/** + * The segment reference for the Unnest Data Source. + * The input column name, output name and the allowSet follow from {@link org.apache.druid.query.UnnestDataSource} + */ +public class UnnestSegmentReference implements SegmentReference +{ + private static final Logger log = new Logger(UnnestSegmentReference.class); + + private final SegmentReference baseSegment; + private final String dimension; + private final String renamedOutputDimension; + private final LinkedHashSet allowSet; + + public UnnestSegmentReference(SegmentReference baseSegment, String dimension, String outputName, LinkedHashSet allowList) + { + this.baseSegment = baseSegment; + this.dimension = dimension; + this.renamedOutputDimension = outputName; + this.allowSet = allowList; + } + + @Override + public Optional acquireReferences() + { + Closer closer = Closer.create(); + try { + boolean acquireFailed = baseSegment.acquireReferences().map(closeable -> { + closer.register(closeable); + return false; + }).orElse(true); + + if (acquireFailed) { + CloseableUtils.closeAndWrapExceptions(closer); + return Optional.empty(); + } else { + return Optional.of(closer); + } + } + catch (Throwable e) { + // acquireReferences is not permitted to throw exceptions. + CloseableUtils.closeAndSuppressExceptions(closer, e::addSuppressed); + log.warn(e, "Exception encountered while trying to acquire reference"); + return Optional.empty(); + } + } + + @Override + public SegmentId getId() + { + return baseSegment.getId(); + } + + @Override + public Interval getDataInterval() + { + return baseSegment.getDataInterval(); + } + + @Nullable + @Override + public QueryableIndex asQueryableIndex() + { + return null; + } + + @Override + public StorageAdapter asStorageAdapter() + { + return new UnnestStorageAdapter( + baseSegment.asStorageAdapter(), + dimension, + renamedOutputDimension, + allowSet + ); + } + + @Override + public void close() throws IOException + { + baseSegment.close(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java new file mode 100644 index 00000000000..f76ab89270a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.filter.Filter; +import org.apache.druid.query.filter.InDimFilter; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.Indexed; +import org.apache.druid.segment.data.ListIndexed; +import org.apache.druid.segment.filter.AndFilter; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Objects; + +/** + * This class serves as the Storage Adapter for the Unnest Segment and is responsible for creating the cursors + * If the column is dictionary encoded it creates {@link UnnestDimensionCursor} else {@link UnnestColumnValueSelectorCursor} + * These cursors help navigate the segments for these cases + */ +public class UnnestStorageAdapter implements StorageAdapter +{ + private final StorageAdapter baseAdapter; + private final String dimensionToUnnest; + private final String outputColumnName; + private final LinkedHashSet allowSet; + + public UnnestStorageAdapter( + final StorageAdapter baseAdapter, + final String dimension, + final String outputColumnName, + final LinkedHashSet allowSet + ) + { + this.baseAdapter = baseAdapter; + this.dimensionToUnnest = dimension; + this.outputColumnName = outputColumnName; + this.allowSet = allowSet; + } + + @Override + public Sequence makeCursors( + @Nullable Filter filter, + Interval interval, + VirtualColumns virtualColumns, + Granularity gran, + boolean descending, + @Nullable QueryMetrics queryMetrics + ) + { + Filter updatedFilter; + if (allowSet != null && !allowSet.isEmpty()) { + final InDimFilter allowListFilters; + allowListFilters = new InDimFilter(dimensionToUnnest, allowSet); + if (filter != null) { + updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters)); + } else { + updatedFilter = allowListFilters; + } + } else { + updatedFilter = filter; + } + final Sequence baseCursorSequence = baseAdapter.makeCursors( + updatedFilter, + interval, + virtualColumns, + gran, + descending, + queryMetrics + ); + + return Sequences.map( + baseCursorSequence, + cursor -> { + Objects.requireNonNull(cursor); + Cursor retVal = cursor; + ColumnCapabilities capabilities = cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest); + if (capabilities != null) { + if (capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue()) { + retVal = new UnnestDimensionCursor( + retVal, + retVal.getColumnSelectorFactory(), + dimensionToUnnest, + outputColumnName, + allowSet + ); + } else { + retVal = new UnnestColumnValueSelectorCursor( + retVal, + retVal.getColumnSelectorFactory(), + dimensionToUnnest, + outputColumnName, + allowSet + ); + } + } else { + retVal = new UnnestColumnValueSelectorCursor( + retVal, + retVal.getColumnSelectorFactory(), + dimensionToUnnest, + outputColumnName, + allowSet + ); + } + return retVal; + } + ); + } + + @Override + public Interval getInterval() + { + return baseAdapter.getInterval(); + } + + @Override + public Indexed getAvailableDimensions() + { + final LinkedHashSet availableDimensions = new LinkedHashSet<>(); + + for (String dim : baseAdapter.getAvailableDimensions()) { + availableDimensions.add(dim); + } + availableDimensions.add(outputColumnName); + return new ListIndexed<>(Lists.newArrayList(availableDimensions)); + } + + @Override + public Iterable getAvailableMetrics() + { + return baseAdapter.getAvailableMetrics(); + } + + @Override + public int getDimensionCardinality(String column) + { + if (!outputColumnName.equals(column)) { + return baseAdapter.getDimensionCardinality(column); + } + return baseAdapter.getDimensionCardinality(dimensionToUnnest); + } + + @Override + public DateTime getMinTime() + { + return baseAdapter.getMinTime(); + } + + @Override + public DateTime getMaxTime() + { + return baseAdapter.getMaxTime(); + } + + @Nullable + @Override + public Comparable getMinValue(String column) + { + if (!outputColumnName.equals(column)) { + return baseAdapter.getMinValue(column); + } + return baseAdapter.getMinValue(dimensionToUnnest); + } + + @Nullable + @Override + public Comparable getMaxValue(String column) + { + if (!outputColumnName.equals(column)) { + return baseAdapter.getMaxValue(column); + } + return baseAdapter.getMaxValue(dimensionToUnnest); + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + if (!outputColumnName.equals(column)) { + return baseAdapter.getColumnCapabilities(column); + } + return baseAdapter.getColumnCapabilities(dimensionToUnnest); + } + + @Override + public int getNumRows() + { + return 0; + } + + @Override + public DateTime getMaxIngestedEventTime() + { + return baseAdapter.getMaxIngestedEventTime(); + } + + @Nullable + @Override + public Metadata getMetadata() + { + return baseAdapter.getMetadata(); + } + + public String getDimensionToUnnest() + { + return dimensionToUnnest; + } +} + diff --git a/processing/src/test/java/org/apache/druid/segment/ListCursor.java b/processing/src/test/java/org/apache/druid/segment/ListCursor.java new file mode 100644 index 00000000000..666bc21be5b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/ListCursor.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.base.Predicate; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.data.IndexedInts; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.List; + + +/** + * A Cursor that iterates over a user created list. + * This is used to test the base cursor of an UnnestCursor. + * Usages can be found in tests of {@link UnnestColumnValueSelectorCursor} in {@link UnnestColumnValueSelectorCursorTest} + * However this cannot help with {@link UnnestDimensionCursor}. + * Tests for {@link UnnestDimensionCursor} are done alongside tests for {@link UnnestStorageAdapterTest} + */ +public class ListCursor implements Cursor +{ + List baseList; + private int index; + + public ListCursor(List inputList) + { + this.baseList = inputList; + } + + @Override + public ColumnSelectorFactory getColumnSelectorFactory() + { + return new ColumnSelectorFactory() + { + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + return null; + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + return null; + } + + @Override + public ValueMatcher makeValueMatcher(Predicate predicate) + { + return null; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Nullable + @Override + public Object getObject() + { + if (index < baseList.size()) { + return baseList.get(index); + } + return null; + } + + @Override + public Class classOfObject() + { + return null; + } + + @Override + public int getValueCardinality() + { + return 0; + } + + @Nullable + @Override + public String lookupName(int id) + { + return null; + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return false; + } + + @Nullable + @Override + public IdLookup idLookup() + { + return null; + } + }; + } + + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + return 0; + } + + @Override + public float getFloat() + { + return 0; + } + + @Override + public long getLong() + { + return 0; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + + } + + @Override + public boolean isNull() + { + return false; + } + + @Nullable + @Override + public Object getObject() + { + if (index < baseList.size()) { + return baseList.get(index); + } + return null; + } + + @Override + public Class classOfObject() + { + return null; + } + }; + } + + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + return null; + } + }; + } + + @Override + public DateTime getTime() + { + return null; + } + + @Override + public void advance() + { + advanceUninterruptibly(); + BaseQuery.checkInterrupted(); + } + + @Override + public void advanceUninterruptibly() + { + index++; + } + + @Override + public boolean isDone() + { + return index > baseList.size() - 1; + } + + @Override + public boolean isDoneOrInterrupted() + { + return false; + } + + @Override + public void reset() + { + index = 0; + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java b/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java new file mode 100644 index 00000000000..b3346e1e562 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/UnnestColumnValueSelectorCursorTest.java @@ -0,0 +1,632 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.java.util.common.UOE; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; + +public class UnnestColumnValueSelectorCursorTest extends InitializedNullHandlingTest +{ + private static String OUTPUT_NAME = "unnested-column"; + private static LinkedHashSet IGNORE_SET = null; + private static LinkedHashSet IGNORE_SET1 = new LinkedHashSet<>(Arrays.asList("b", "f")); + + + @Test + public void test_list_unnest_cursors() + { + ArrayList baseList = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + List newList = new ArrayList<>(); + for (int j = 0; j < 2; j++) { + newList.add(String.valueOf(i * 2 + j)); + } + baseList.add(newList); + } + ListCursor listCursor = new ListCursor(baseList); + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int j = 0; + while (!unnestCursor.isDone()) { + Object colSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertEquals(colSelectorVal.toString(), String.valueOf(j)); + j++; + unnestCursor.advance(); + } + Assert.assertEquals(j, 4); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list() + { + List inputList = Arrays.asList( + Arrays.asList("a", "b", "c"), + Arrays.asList("e", "f", "g", "h", "i"), + Collections.singletonList("j") + ); + + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j"); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 9); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_only_nulls() + { + List inputList = Arrays.asList( + Collections.singletonList(null), + Arrays.asList(null, null), + Collections.singletonList(null) + ); + + List expectedResults = Arrays.asList(null, null, null, null); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertNull(valueSelectorVal); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 4); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_mixed_with_nulls() + { + List inputList = Arrays.asList( + Arrays.asList("a", "b"), + Arrays.asList("b", "c"), + "d", + null, + null, + null + ); + + List expectedResults = Arrays.asList("a", "b", "b", "c", "d", null, null, null); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + if (valueSelectorVal == null) { + Assert.assertEquals(null, expectedResults.get(k)); + } else { + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + } + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 8); + } + + @Test + public void test_list_unnest_cursors_user_supplied_strings_and_no_lists() + { + List inputList = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j"); + + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j"); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 9); + } + + @Test + public void test_list_unnest_cursors_user_supplied_strings_mixed_with_list() + { + List inputList = Arrays.asList("a", "b", "c", "e", "f", Arrays.asList("g", "h"), "i", "j"); + + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j"); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 9); + } + + @Test + public void test_list_unnest_cursors_user_supplied_lists_three_levels() + { + List inputList = Arrays.asList( + Arrays.asList("a", "b", "c"), + Arrays.asList("e", "f", "g", "h", "i"), + Arrays.asList("j", Arrays.asList("a", "b")) + ); + + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j", Arrays.asList("a", "b")); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString()); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 10); + } + + @Test + public void test_list_unnest_of_unnest_cursors_user_supplied_list_three_levels() + { + List inputList = Arrays.asList( + Arrays.asList("a", "b", "c"), + Arrays.asList("e", "f", "g", "h", "i"), + Arrays.asList("j", Arrays.asList("a", "b")) + ); + + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j", "a", "b"); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor childCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + UnnestColumnValueSelectorCursor parentCursor = new UnnestColumnValueSelectorCursor( + childCursor, + childCursor.getColumnSelectorFactory(), + OUTPUT_NAME, + "tmp-out", + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = parentCursor.getColumnSelectorFactory() + .makeColumnValueSelector("tmp-out"); + int k = 0; + while (!parentCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString()); + k++; + parentCursor.advance(); + } + Assert.assertEquals(k, 11); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_with_nulls() + { + List inputList = Arrays.asList( + Arrays.asList("a", "b", "c"), + Arrays.asList("e", "f", "g", "h", "i", null), + Collections.singletonList("j") + ); + + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", null, "j"); + + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + if (valueSelectorVal == null) { + Assert.assertEquals(null, expectedResults.get(k)); + } else { + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + } + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, expectedResults.size()); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_with_dups() + { + List inputList = Arrays.asList( + Arrays.asList("a", "a", "a"), + Arrays.asList("e", "f", null, "h", "i", null), + Collections.singletonList("j") + ); + + List expectedResults = Arrays.asList("a", "a", "a", "e", "f", null, "h", "i", null, "j"); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + if (valueSelectorVal == null) { + Assert.assertEquals(null, expectedResults.get(k)); + } else { + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + } + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 10); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_with_ignore_set() + { + List inputList = Arrays.asList( + Arrays.asList("a", "b", "c"), + Arrays.asList("e", "f", "g", "h", "i"), + Collections.singletonList("j") + ); + + List expectedResults = Arrays.asList("b", "f"); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET1 + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + if (valueSelectorVal == null) { + Assert.assertEquals(null, expectedResults.get(k)); + } else { + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k)); + } + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 2); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_double() + { + List inputList = Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(4, 5, 6, 7, 8), + Collections.singletonList(9) + ); + + List expectedResults = Arrays.asList(1d, 2d, 3d, 4d, 5d, 6d, 7d, 8d, 9d); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Double valueSelectorVal = unnestColumnValueSelector.getDouble(); + Assert.assertEquals(valueSelectorVal, expectedResults.get(k)); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 9); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_float() + { + List inputList = Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(4, 5, 6, 7, 8), + Collections.singletonList(9) + ); + + List expectedResults = Arrays.asList(1f, 2f, 3f, 4f, 5f, 6f, 7f, 8f, 9f); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Float valueSelectorVal = unnestColumnValueSelector.getFloat(); + Assert.assertEquals(valueSelectorVal, expectedResults.get(k)); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 9); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_long() + { + List inputList = Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(4, 5, 6, 7, 8), + Collections.singletonList(9) + ); + + List expectedResults = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + + int k = 0; + while (!unnestCursor.isDone()) { + Object obj = unnestColumnValueSelector.getObject(); + Assert.assertNotNull(obj); + Long valueSelectorVal = unnestColumnValueSelector.getLong(); + Assert.assertEquals(valueSelectorVal, expectedResults.get(k)); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 9); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_three_level_arrays_and_methods() + { + List inputList = Arrays.asList( + Arrays.asList("a", "b", "c"), + Arrays.asList("e", "f", "g", "h", "i"), + Arrays.asList("j", Arrays.asList("a", "b")) + ); + + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j", Arrays.asList("a", "b")); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString()); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 10); + unnestCursor.reset(); + Assert.assertFalse(unnestCursor.isDoneOrInterrupted()); + } + + @Test(expected = UOE.class) + public void test_list_unnest_cursors_dimSelector() + { + List inputList = Arrays.asList( + Arrays.asList("a", "b", "c"), + Arrays.asList("e", "f", "g", "h", "i"), + Collections.singletonList("j") + ); + + List expectedResults = Arrays.asList("a", "b", "c", "e", "f", "g", "h", "i", "j"); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + unnestCursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_NAME)); + } + + @Test + public void test_list_unnest_cursors_user_supplied_list_of_integers() + { + List inputList = Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(4, 5, 6, 7, 8), + Collections.singletonList(9) + ); + + List expectedResults = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); + + //Create base cursor + ListCursor listCursor = new ListCursor(inputList); + + //Create unnest cursor + UnnestColumnValueSelectorCursor unnestCursor = new UnnestColumnValueSelectorCursor( + listCursor, + listCursor.getColumnSelectorFactory(), + "dummy", + OUTPUT_NAME, + IGNORE_SET + ); + ColumnValueSelector unnestColumnValueSelector = unnestCursor.getColumnSelectorFactory() + .makeColumnValueSelector(OUTPUT_NAME); + int k = 0; + while (!unnestCursor.isDone()) { + Object valueSelectorVal = unnestColumnValueSelector.getObject(); + Assert.assertEquals(valueSelectorVal.toString(), expectedResults.get(k).toString()); + k++; + unnestCursor.advance(); + } + Assert.assertEquals(k, 9); + } +} + diff --git a/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java new file mode 100644 index 00000000000..35d42b82d4b --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.generator.GeneratorBasicSchemas; +import org.apache.druid.segment.generator.GeneratorSchemaInfo; +import org.apache.druid.segment.generator.SegmentGenerator; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.apache.druid.utils.CloseableUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; + +public class UnnestStorageAdapterTest extends InitializedNullHandlingTest +{ + private static Closer CLOSER; + private static IncrementalIndex INCREMENTAL_INDEX; + private static IncrementalIndexStorageAdapter INCREMENTAL_INDEX_STORAGE_ADAPTER; + private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER; + private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER1; + private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER2; + private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER3; + private static List ADAPTERS; + private static String COLUMNNAME = "multi-string1"; + private static String OUTPUT_COLUMN_NAME = "unnested-multi-string1"; + private static String OUTPUT_COLUMN_NAME1 = "unnested-multi-string1-again"; + private static LinkedHashSet IGNORE_SET = new LinkedHashSet<>(Arrays.asList("1", "3", "5")); + + @BeforeClass + public static void setup() + { + CLOSER = Closer.create(); + final GeneratorSchemaInfo schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench"); + + final DataSegment dataSegment = DataSegment.builder() + .dataSource("foo") + .interval(schemaInfo.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + final SegmentGenerator segmentGenerator = CLOSER.register(new SegmentGenerator()); + + final int numRows = 2; + INCREMENTAL_INDEX = CLOSER.register( + segmentGenerator.generateIncrementalIndex(dataSegment, schemaInfo, Granularities.HOUR, numRows) + ); + INCREMENTAL_INDEX_STORAGE_ADAPTER = new IncrementalIndexStorageAdapter(INCREMENTAL_INDEX); + UNNEST_STORAGE_ADAPTER = new UnnestStorageAdapter( + INCREMENTAL_INDEX_STORAGE_ADAPTER, + COLUMNNAME, + OUTPUT_COLUMN_NAME, + null + ); + UNNEST_STORAGE_ADAPTER1 = new UnnestStorageAdapter( + INCREMENTAL_INDEX_STORAGE_ADAPTER, + COLUMNNAME, + OUTPUT_COLUMN_NAME, + IGNORE_SET + ); + UNNEST_STORAGE_ADAPTER2 = new UnnestStorageAdapter( + UNNEST_STORAGE_ADAPTER, + COLUMNNAME, + OUTPUT_COLUMN_NAME1, + null + ); + UNNEST_STORAGE_ADAPTER3 = new UnnestStorageAdapter( + UNNEST_STORAGE_ADAPTER1, + COLUMNNAME, + OUTPUT_COLUMN_NAME1, + IGNORE_SET + ); + ADAPTERS = ImmutableList.of( + UNNEST_STORAGE_ADAPTER, + UNNEST_STORAGE_ADAPTER1, + UNNEST_STORAGE_ADAPTER2, + UNNEST_STORAGE_ADAPTER3 + ); + } + + @AfterClass + public static void teardown() + { + CloseableUtils.closeAndSuppressExceptions(CLOSER, throwable -> { + }); + } + + @Test + public void test_group_of_unnest_adapters_methods() + { + String colName = "multi-string1"; + for (StorageAdapter adapter : ADAPTERS) { + Assert.assertEquals( + DateTimes.of("2000-01-01T23:00:00.000Z"), + adapter.getMaxTime() + ); + Assert.assertEquals( + DateTimes.of("2000-01-01T12:00:00.000Z"), + adapter.getMinTime() + ); + adapter.getColumnCapabilities(colName); + Assert.assertEquals(adapter.getNumRows(), 0); + Assert.assertNotNull(adapter.getMetadata()); + Assert.assertEquals( + DateTimes.of("2000-01-01T23:59:59.999Z"), + adapter.getMaxIngestedEventTime() + ); + Assert.assertEquals( + adapter.getColumnCapabilities(colName).toColumnType(), + INCREMENTAL_INDEX_STORAGE_ADAPTER.getColumnCapabilities(colName).toColumnType() + ); + Assert.assertEquals(((UnnestStorageAdapter) adapter).getDimensionToUnnest(), colName); + } + } + + @Test + public void test_group_of_unnest_adapters_column_capabilities() + { + String colName = "multi-string1"; + List columnsInTable = Arrays.asList( + "string1", + "long1", + "double1", + "float1", + "multi-string1", + OUTPUT_COLUMN_NAME + ); + List valueTypes = Arrays.asList( + ValueType.STRING, + ValueType.LONG, + ValueType.DOUBLE, + ValueType.FLOAT, + ValueType.STRING, + ValueType.STRING + ); + UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER; + + for (int i = 0; i < columnsInTable.size(); i++) { + ColumnCapabilities capabilities = adapter.getColumnCapabilities(columnsInTable.get(i)); + Assert.assertEquals(capabilities.getType(), valueTypes.get(i)); + } + Assert.assertEquals(adapter.getDimensionToUnnest(), colName); + + } + + @Test + public void test_unnest_adapters_basic() + { + + Sequence cursorSequence = UNNEST_STORAGE_ADAPTER.makeCursors( + null, + UNNEST_STORAGE_ADAPTER.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME)); + int count = 0; + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + if (dimSelectorVal == null) { + Assert.assertNull(dimSelectorVal); + } + cursor.advance(); + count++; + } + /* + each row has 8 entries. + unnest 2 rows -> 16 rows after unnest + */ + Assert.assertEquals(count, 16); + return null; + }); + + } + + @Test + public void test_two_levels_of_unnest_adapters() + { + Sequence cursorSequence = UNNEST_STORAGE_ADAPTER2.makeCursors( + null, + UNNEST_STORAGE_ADAPTER2.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME1)); + ColumnValueSelector valueSelector = factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME1); + + int count = 0; + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + Object valueSelectorVal = valueSelector.getObject(); + if (dimSelectorVal == null) { + Assert.assertNull(dimSelectorVal); + } else if (valueSelectorVal == null) { + Assert.assertNull(valueSelectorVal); + } + cursor.advance(); + count++; + } + /* + each row has 8 entries. + unnest 2 rows -> 16 entries also the value cardinality + unnest of unnest -> 16*8 = 128 rows + */ + Assert.assertEquals(count, 128); + Assert.assertEquals(dimSelector.getValueCardinality(), 16); + return null; + }); + } + + @Test + public void test_unnest_adapters_with_allowList() + { + final String columnName = "multi-string1"; + + Sequence cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors( + null, + UNNEST_STORAGE_ADAPTER1.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME)); + ColumnValueSelector valueSelector = factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME); + + int count = 0; + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + Object valueSelectorVal = valueSelector.getObject(); + if (dimSelectorVal == null) { + Assert.assertNull(dimSelectorVal); + } else if (valueSelectorVal == null) { + Assert.assertNull(valueSelectorVal); + } + cursor.advance(); + count++; + } + /* + each row has 8 distinct entries. + allowlist has 3 entries also the value cardinality + unnest will have 3 distinct entries + */ + Assert.assertEquals(count, 3); + Assert.assertEquals(dimSelector.getValueCardinality(), 3); + return null; + }); + } + + @Test + public void test_two_levels_of_unnest_adapters_with_allowList() + { + final String columnName = "multi-string1"; + + Sequence cursorSequence = UNNEST_STORAGE_ADAPTER3.makeCursors( + null, + UNNEST_STORAGE_ADAPTER3.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER3; + Assert.assertEquals(adapter.getDimensionToUnnest(), columnName); + Assert.assertEquals( + adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(), + ColumnCapabilities.Capable.TRUE + ); + Assert.assertEquals(adapter.getMaxValue(columnName), adapter.getMaxValue(OUTPUT_COLUMN_NAME)); + Assert.assertEquals(adapter.getMinValue(columnName), adapter.getMinValue(OUTPUT_COLUMN_NAME)); + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME1)); + ColumnValueSelector valueSelector = factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME1); + + int count = 0; + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + Object valueSelectorVal = valueSelector.getObject(); + if (dimSelectorVal == null) { + Assert.assertNull(dimSelectorVal); + } else if (valueSelectorVal == null) { + Assert.assertNull(valueSelectorVal); + } + cursor.advance(); + count++; + } + /* + each row has 8 distinct entries. + allowlist has 3 entries also the value cardinality + unnest will have 3 distinct entries + unnest of that unnest will have 3*3 = 9 entries + */ + Assert.assertEquals(count, 9); + Assert.assertEquals(dimSelector.getValueCardinality(), 3); + return null; + }); + } + + @Test + public void test_unnest_adapters_methods_with_allowList() + { + final String columnName = "multi-string1"; + + Sequence cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors( + null, + UNNEST_STORAGE_ADAPTER1.getInterval(), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER1; + Assert.assertEquals(adapter.getDimensionToUnnest(), columnName); + Assert.assertEquals( + adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(), + ColumnCapabilities.Capable.TRUE + ); + Assert.assertEquals(adapter.getMaxValue(columnName), adapter.getMaxValue(OUTPUT_COLUMN_NAME)); + Assert.assertEquals(adapter.getMinValue(columnName), adapter.getMinValue(OUTPUT_COLUMN_NAME)); + + cursorSequence.accumulate(null, (accumulated, cursor) -> { + ColumnSelectorFactory factory = cursor.getColumnSelectorFactory(); + + DimensionSelector dimSelector = factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME)); + IdLookup idlookUp = dimSelector.idLookup(); + Assert.assertFalse(dimSelector.isNull()); + int[] indices = new int[]{1, 3, 5}; + int count = 0; + while (!cursor.isDone()) { + Object dimSelectorVal = dimSelector.getObject(); + Assert.assertEquals(idlookUp.lookupId((String) dimSelectorVal), indices[count]); + // after unnest first entry in get row should equal the object + // and the row size will always be 1 + Assert.assertEquals(dimSelector.getRow().get(0), indices[count]); + Assert.assertEquals(dimSelector.getRow().size(), 1); + Assert.assertNotNull(dimSelector.makeValueMatcher(OUTPUT_COLUMN_NAME)); + cursor.advance(); + count++; + } + Assert.assertEquals(dimSelector.getValueCardinality(), 3); + Assert.assertEquals(count, 3); + return null; + }); + } +}