diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index d02596900df..650bd0ae452 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -35,7 +35,7 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; -import org.apache.druid.query.rowsandcols.column.NullColumnAccessor; +import org.apache.druid.query.rowsandcols.column.NullColumn; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -165,7 +165,7 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest List emptyToNull(List list) - { - if (list == null || list.isEmpty()) { - return null; - } else { - return list; - } - } - - private final List aggregations; - private final List cumulativeAggregations; - - @JsonCreator - public WindowAggregateProcessor( - @JsonProperty("aggregations") List aggregations, - @JsonProperty("cumulativeAggregations") List cumulativeAggregations - ) - { - this.aggregations = emptyToNull(aggregations); - this.cumulativeAggregations = emptyToNull(cumulativeAggregations); - } - - @JsonProperty("aggregations") - public List getAggregations() - { - return aggregations; - } - - @JsonProperty("cumulativeAggregations") - public List getCumulativeAggregations() - { - return cumulativeAggregations; - } - - @Override - public RowsAndColumns process(RowsAndColumns inputPartition) - { - AppendableRowsAndColumns retVal = RowsAndColumns.expectAppendable(inputPartition); - - if (aggregations != null) { - OnHeapAggregatable aggregatable = inputPartition.as(OnHeapAggregatable.class); - if (aggregatable == null) { - aggregatable = new DefaultOnHeapAggregatable(inputPartition); - } - final ArrayList aggregatedVals = aggregatable.aggregateAll(aggregations); - - for (int i = 0; i < aggregations.size(); ++i) { - final AggregatorFactory agg = aggregations.get(i); - retVal.addColumn( - agg.getName(), - new ConstantObjectColumn(aggregatedVals.get(i), inputPartition.numRows(), agg.getResultType()) - ); - } - } - - if (cumulativeAggregations != null) { - OnHeapCumulativeAggregatable cummulativeAgg = inputPartition.as(OnHeapCumulativeAggregatable.class); - if (cummulativeAgg == null) { - cummulativeAgg = new DefaultOnHeapAggregatable(inputPartition); - } - final ArrayList cumulativeVals = cummulativeAgg.aggregateCumulative(cumulativeAggregations); - - for (int i = 0; i < cumulativeAggregations.size(); ++i) { - final AggregatorFactory agg = cumulativeAggregations.get(i); - retVal.addColumn(agg.getName(), new ObjectArrayColumn(cumulativeVals.get(i), agg.getResultType())); - } - } - - return retVal; - } - - @Override - public boolean validateEquivalent(Processor otherProcessor) - { - if (otherProcessor instanceof WindowAggregateProcessor) { - WindowAggregateProcessor other = (WindowAggregateProcessor) otherProcessor; - return Objects.equals(aggregations, other.aggregations) - && Objects.equals(cumulativeAggregations, other.cumulativeAggregations); - } - return false; - } - - @Override - public String toString() - { - return "WindowAggregateProcessor{" + - "aggregations=" + aggregations + - ", cumulativeAggregations=" + cumulativeAggregations + - '}'; - } -} diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java index e68760ac17e..b0dcc61415b 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java @@ -26,6 +26,11 @@ import java.util.Objects; public class WindowFrame { + public static WindowFrame unbounded() + { + return new WindowFrame(PeerType.ROWS, true, 0, true, 0); + } + @SuppressWarnings("unused") public enum PeerType { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java index c58e8cde30a..073fad16a4b 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ArrayListRowsAndColumns.java @@ -19,25 +19,68 @@ package org.apache.druid.query.rowsandcols; +import it.unimi.dsi.fastutil.Arrays; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntComparator; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.ColumnValueSwapper; +import org.apache.druid.query.rowsandcols.column.DefaultVectorCopier; +import org.apache.druid.query.rowsandcols.column.LimitedColumn; +import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectColumnAccessorBase; +import org.apache.druid.query.rowsandcols.column.VectorCopier; +import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner; +import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner; +import org.apache.druid.query.rowsandcols.semantic.NaiveSortMaker; import org.apache.druid.segment.RowAdapter; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; -public class ArrayListRowsAndColumns implements RowsAndColumns +/** + * ArrayListRowsAndColumns is a RowsAndColumns implementation that believes it has all of its data on-heap. + *

+ * It is an AppendableRowsAndColumns and, as with all RowsAndColumns, it is not thread-safe for multiple writes. + * Under certain circumstances, concurrent reads from multiple threads can be correct, but the code has to be follow + * a very strict ordering of code that ensures that reads only happen after writes and once a value is read, it will + * never be overwritten. + *

+ * Additionally, this object implements various of the semantic interfaces directly to provide some degree + * of processing and memory optimization. + * + * @param + */ +public class ArrayListRowsAndColumns implements AppendableRowsAndColumns { + @SuppressWarnings("rawtypes") + private static final HashMap, Function> AS_MAP = makeAsMap(); + private final ArrayList rows; private final RowAdapter rowAdapter; private final RowSignature rowSignature; + private final Map extraColumns; + private final Set columnNames; + private final int startOffset; + private final int endOffset; + public ArrayListRowsAndColumns( ArrayList rows, @@ -45,21 +88,49 @@ public class ArrayListRowsAndColumns implements RowsAndColumns RowSignature rowSignature ) { + this( + rows, + rowAdapter, + rowSignature, + new LinkedHashMap<>(), + new LinkedHashSet<>(rowSignature.getColumnNames()), + 0, + rows.size() + ); + } + + private ArrayListRowsAndColumns( + ArrayList rows, + RowAdapter rowAdapter, + RowSignature rowSignature, + Map extraColumns, + Set columnNames, + int startOffset, + int endOffset + ) + { + if (endOffset - startOffset < 0) { + throw new ISE("endOffset[%,d] - startOffset[%,d] was somehow negative!?", endOffset, startOffset); + } this.rows = rows; this.rowAdapter = rowAdapter; this.rowSignature = rowSignature; + this.extraColumns = extraColumns; + this.columnNames = columnNames; + this.startOffset = startOffset; + this.endOffset = endOffset; } @Override public Collection getColumnNames() { - return rowSignature.getColumnNames(); + return columnNames; } @Override public int numRows() { - return rows.size(); + return endOffset - startOffset; } @Override @@ -67,7 +138,11 @@ public class ArrayListRowsAndColumns implements RowsAndColumns public Column findColumn(String name) { if (!rowSignature.contains(name)) { - return null; + final Column retVal = extraColumns.get(name); + if (numRows() == rows.size()) { + return retVal; + } + return new LimitedColumn(retVal, startOffset, endOffset); } final Function adapterForValue = rowAdapter.columnFunction(name); @@ -77,6 +152,7 @@ public class ArrayListRowsAndColumns implements RowsAndColumns return new Column() { + @Nonnull @Override public ColumnAccessor toAccessor() { @@ -85,7 +161,7 @@ public class ArrayListRowsAndColumns implements RowsAndColumns @Override protected Object getVal(int rowNum) { - return adapterForValue.apply(rows.get(rowNum)); + return adapterForValue.apply(rows.get(startOffset + rowNum)); } @Override @@ -103,11 +179,12 @@ public class ArrayListRowsAndColumns implements RowsAndColumns @Override public int numRows() { - return rows.size(); + return endOffset - startOffset; } }; } + @Nullable @Override public T as(Class clazz) { @@ -118,8 +195,340 @@ public class ArrayListRowsAndColumns implements RowsAndColumns @Nullable @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public T as(Class clazz) { - return null; + final Function fn = AS_MAP.get(clazz); + if (fn == null) { + return null; + } + return (T) fn.apply(this); + } + + + @Override + public void addColumn(String name, Column column) + { + if (rows.size() == numRows()) { + extraColumns.put(name, column); + columnNames.add(name); + return; + } + + // When an ArrayListRowsAndColumns is only a partial view, but adds a column, it believes that the same column + // will eventually be added for all of the rows so we pre-allocate storage for the entire set of data and + // copy. + + final ColumnAccessor columnAccessor = column.toAccessor(); + if (columnAccessor.numRows() != numRows()) { + throw new ISE("More rows[%,d] than expected[%,d]", columnAccessor.numRows(), numRows()); + } + + final Column extraColumn = extraColumns.get(name); + final ObjectArrayColumn existingColumn; + if (extraColumn == null) { + existingColumn = new ObjectArrayColumn(new Object[rows.size()], columnAccessor.getType()); + extraColumns.put(name, existingColumn); + columnNames.add(name); + } else if (extraColumn instanceof ObjectArrayColumn) { + existingColumn = (ObjectArrayColumn) extraColumn; + } else { + throw new ISE( + "Partial column[%s] was added, but already have full column[%s]", + column.getClass(), + extraColumn.getClass() + ); + } + + VectorCopier copier = column.as(VectorCopier.class); + if (copier == null) { + copier = new DefaultVectorCopier(columnAccessor); + } + + copier.copyInto(existingColumn.getObjects(), startOffset); + } + + private ArrayListRowsAndColumns limited(int startOffset, int endOffset) + { + return new ArrayListRowsAndColumns<>( + rows, + rowAdapter, + rowSignature, + extraColumns, + columnNames, + startOffset, + endOffset + ); + } + + public void sort(ArrayList ordering) + { + ArrayList comparators = new ArrayList<>(ordering.size()); + for (ColumnWithDirection columnWithDirection : ordering) { + final Column column = findColumn(columnWithDirection.getColumn()); + if (column != null) { + comparators.add(new IntComparator() + { + final ColumnAccessor accessor = column.toAccessor(); + private final int directionInt = columnWithDirection.getDirection().getDirectionInt(); + + @Override + public int compare(int lhs, int rhs) + { + return accessor.compareRows(lhs, rhs) * directionInt; + } + }); + } + } + + ArrayList swappers = new ArrayList<>(extraColumns.size()); + for (Map.Entry entry : extraColumns.entrySet()) { + final Column column = entry.getValue(); + final ColumnValueSwapper swapper = column.as(ColumnValueSwapper.class); + if (swapper == null) { + throw new ISE("Column[%s] of type[%s] cannot be sorted.", entry.getKey(), column.getClass()); + } + swappers.add(swapper); + } + + Arrays.quickSort( + 0, + rows.size(), + (lhs, rhs) -> { + for (IntComparator comparator : comparators) { + final int retVal = comparator.compare(lhs, rhs); + if (retVal != 0) { + return retVal; + } + } + return 0; + }, + (lhs, rhs) -> { + RowType tmp = rows.get(lhs); + rows.set(lhs, rows.get(rhs)); + rows.set(rhs, tmp); + + for (ColumnValueSwapper swapper : swappers) { + swapper.swapValues(lhs, rhs); + } + } + ); + } + + @SuppressWarnings("rawtypes") + private static HashMap, Function> makeAsMap() + { + HashMap, Function> retVal = new HashMap<>(); + + retVal.put( + ClusteredGroupPartitioner.class, + (Function) rac -> rac.new MyClusteredGroupPartitioner() + ); + + retVal.put( + NaiveSortMaker.class, + (Function) rac -> { + if (rac.startOffset != 0) { + throw new ISE( + "The NaiveSortMaker should happen on the first RAC, start was [%,d], end was [%,d]", + rac.startOffset, + rac.endOffset + ); + } + if (rac.endOffset == rac.rows.size()) { + // In this case, we are being sorted along with other RowsAndColumns objects, we don't have an optimized + // implementation for that, so just return null + //noinspection ReturnOfNull + return null; + } + + // When we are doing a naive sort and we are dealing with the first sub-window from ourselves, then we assume + // that we will see all of the other sub-windows as well, we can run through them and then sort the underlying + // rows at the very end. + return rac.new MyNaiveSortMaker(); + } + ); + + return retVal; + } + + private class MyClusteredGroupPartitioner implements ClusteredGroupPartitioner + { + @Override + public int[] computeBoundaries(List columns) + { + if (numRows() == 0) { + return new int[]{}; + } + + boolean allInSignature = true; + for (String column : columns) { + if (!rowSignature.contains(column)) { + allInSignature = false; + } + } + + if (allInSignature) { + return computeBoundariesAllInSignature(columns); + } else { + return computeBoundariesSomeAppended(columns); + } + } + + /** + * Computes boundaries assuming all columns are defined as in the signature. Given that + * ArrayListRowsAndColumns is a fundamentally row-oriented data structure, using a row-oriented + * algorithm should prove better than the column-oriented implementation in DefaultClusteredGroupPartitioner + * + * @param columns the columns to partition on as in {@link #computeBoundaries(List)} + * @return the partition boundaries as in {@link #computeBoundaries(List)} + */ + private int[] computeBoundariesAllInSignature(List columns) + { + ArrayList> comparators = new ArrayList<>(columns.size()); + ArrayList> adapters = new ArrayList<>(columns.size()); + for (String column : columns) { + final Optional columnType = rowSignature.getColumnType(column); + if (columnType.isPresent()) { + comparators.add(Comparator.nullsFirst(columnType.get().getStrategy())); + adapters.add(rowAdapter.columnFunction(column)); + } else { + throw new ISE("column didn't exist!? Other method should've been called..."); + } + } + + IntList boundaries = new IntArrayList(); + Object[] prevVal = new Object[comparators.size()]; + Object[] nextVal = new Object[comparators.size()]; + + int numRows = endOffset - startOffset; + + boundaries.add(0); + RowType currRow = rows.get(startOffset); + for (int i = 0; i < adapters.size(); ++i) { + prevVal[i] = adapters.get(i).apply(currRow); + } + + for (int i = 1; i < numRows; ++i) { + currRow = rows.get(startOffset + i); + for (int j = 0; j < adapters.size(); ++j) { + nextVal[j] = adapters.get(j).apply(currRow); + } + + for (int j = 0; j < comparators.size(); ++j) { + final int comparison = comparators.get(j).compare(prevVal[j], nextVal[j]); + if (comparison != 0) { + // Swap references + Object[] tmpRef = prevVal; + prevVal = nextVal; + nextVal = tmpRef; + + boundaries.add(i); + break; + } + } + } + boundaries.add(numRows); + + return boundaries.toIntArray(); + } + + /** + * Computes boundaries including some columns that were appended later. In this case, we are fundamentally + * mixing some row-oriented format with some column-oriented format. It's hard to determine if there's really + * an optimized form for this (or, really, it's hard to know if the optimized form would actually be worth + * the code complexity), so just fall back to the DefaultClusteredGroupPartitioner to compute these boundaries, + * the optimizations that come from re-using the ArrayListRowsAndColumns will continue to exist. + * + * @param columns the columns to partition on as in {@link #computeBoundaries(List)} + * @return the partition boundaries as in {@link #computeBoundaries(List)} + */ + private int[] computeBoundariesSomeAppended(List columns) + { + return new DefaultClusteredGroupPartitioner(ArrayListRowsAndColumns.this).computeBoundaries(columns); + } + + + @Override + public ArrayList partitionOnBoundaries(List partitionColumns) + { + final int[] boundaries = computeBoundaries(partitionColumns); + if (boundaries.length < 2) { + return new ArrayList<>(); + } + + ArrayList retVal = new ArrayList<>(boundaries.length - 1); + + for (int i = 1; i < boundaries.length; ++i) { + int start = boundaries[i - 1]; + int end = boundaries[i]; + retVal.add(limited(start, end)); + } + + return retVal; + } + } + + private class MyNaiveSortMaker implements NaiveSortMaker + { + + @Override + public NaiveSorter make(ArrayList ordering) + { + return new NaiveSorter() + { + private int currEnd = endOffset; + + @SuppressWarnings("unchecked") + @Override + public RowsAndColumns moreData(RowsAndColumns rac) + { + if (currEnd == rows.size()) { + // It's theoretically possible that this object is used in a place where it sees a bunch of parts from + // the same ArrayListRowsAndColumns and then, continues to receive more and more RowsAndColumns objects + // from other ArrayListRowsAndColumns. In that case, we can + // 1. do a localized sort + // 2. continue to build up the other objects + // 3. once all objects are complete, do a merge-sort between them and return that + // + // This is not currently implemented, however, as the code cannot actually ever generate that sequence + // of objects. Additionally, if the code ever did generate that sequence, the proper solution could be + // to implement something else differently (perhaps use a different type of RowsAndColumns entirely). + // As such, we leave this implementation as an exercise for the future when it is better known why the + // code needed to work with this specific series of concrete objects. + throw new ISE("More data came after completing the ArrayList, not supported yet."); + } + + if (rac instanceof ArrayListRowsAndColumns) { + ArrayListRowsAndColumns arrayRac = (ArrayListRowsAndColumns) rac; + if (arrayRac.startOffset != currEnd) { + throw new ISE( + "ArrayRAC instances seen out-of-order!? currEnd[%,d], arrayRac[%,d][%,d]", + currEnd, + arrayRac.startOffset, + arrayRac.endOffset + ); + } + currEnd = arrayRac.endOffset; + + return null; + } else { + throw new ISE("Expected an ArrayListRowsAndColumns, got[%s], fall back to default?", rac.getClass()); + } + } + + @Override + public RowsAndColumns complete() + { + if (currEnd != rows.size()) { + throw new ISE("Didn't see all of the rows? currEnd[%,d], rows.size()[%,d]", currEnd, rows.size()); + } + + final ArrayListRowsAndColumns retVal = limited(0, rows.size()); + retVal.sort(ordering); + return retVal; + } + }; + } } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java index e9ae8d75c61..523a982fbaf 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java @@ -22,6 +22,7 @@ package org.apache.druid.query.rowsandcols; import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.NullColumn; import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nonnull; @@ -33,6 +34,13 @@ import java.util.Comparator; import java.util.LinkedHashMap; import java.util.Map; +/** + * A RowsAndColumns implementation that effectively concatenates multiple RowsAndColumns objects together. + *

+ * That is, if it is given 3 RowsAndColumns objects, of sizes 4, 2, and 3 respectively, the resulting + * RowsAndColumns object will have 9 rows where rows 0-to-3 will be from the first object, 4-to-5 from the next and + * 6-to-8 from the last. + */ public class ConcatRowsAndColumns implements RowsAndColumns { private final ArrayList racBuffer; @@ -101,7 +109,11 @@ public class ConcatRowsAndColumns implements RowsAndColumns final ColumnType type = firstAccessor.getType(); for (int i = 1; i < racBuffer.size(); ++i) { RowsAndColumns rac = racBuffer.get(i); - final Column col = rac.findColumn(name); + Column col = rac.findColumn(name); + if (col == null) { + // It doesn't exist, so must be all null! + col = new NullColumn(type, rac.numRows()); + } final ColumnAccessor accessor = col.toAccessor(); if (!type.equals(accessor.getType())) { throw new ISE("Type mismatch, expected[%s], got[%s] on entry[%,d]", type, accessor.getType(), i); @@ -109,7 +121,11 @@ public class ConcatRowsAndColumns implements RowsAndColumns accessors.add(accessor); } - final ConcatedidColumn retVal = new ConcatedidColumn(type, type.getStrategy(), accessors); + final ConcatedidColumn retVal = new ConcatedidColumn( + type, + Comparator.nullsFirst(type.getStrategy()), + accessors + ); columnCache.put(name, retVal); return retVal; } @@ -232,6 +248,7 @@ public class ConcatRowsAndColumns implements RowsAndColumns }; } + @Nullable @Override public T as(Class clazz) { diff --git a/processing/src/main/java/org/apache/druid/query/operator/LimitedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java similarity index 61% rename from processing/src/main/java/org/apache/druid/query/operator/LimitedRowsAndColumns.java rename to processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java index 95b642f69c2..abb3d4649b1 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/LimitedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java @@ -17,13 +17,11 @@ * under the License. */ -package org.apache.druid.query.operator; +package org.apache.druid.query.rowsandcols; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.query.operator.window.value.ShiftedColumnAccessorBase; -import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.LimitedColumn; import javax.annotation.Nullable; import java.util.Collection; @@ -66,44 +64,7 @@ public class LimitedRowsAndColumns implements RowsAndColumns return null; } - return new Column() - { - @Override - public ColumnAccessor toAccessor() - { - final ColumnAccessor columnAccessor = column.toAccessor(); - return new ShiftedColumnAccessorBase(columnAccessor) - { - @Override - public int numRows() - { - return end - start; - } - - @Override - protected int getActualValue(int rowNum) - { - int retVal = start + rowNum; - if (retVal >= end) { - throw new ISE("Index out of bounds[%d] >= [%d], start[%s]", retVal, end, start); - } - return retVal; - } - - @Override - protected boolean outsideBounds(int rowNum) - { - return false; - } - }; - } - - @Override - public T as(Class clazz) - { - return null; - } - }; + return new LimitedColumn(column, start, end); } @Nullable @@ -112,4 +73,5 @@ public class LimitedRowsAndColumns implements RowsAndColumns { return null; } + } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java index 31548c66bf6..e320ac1999c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java @@ -40,14 +40,15 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns return fromMap(ImmutableMap.of(name, col, name2, col2)); } - public static MapOfColumnsRowsAndColumns fromMap(Map map) + @SuppressWarnings("unchecked") + public static MapOfColumnsRowsAndColumns fromMap(Map map) { if (map == null || map.isEmpty()) { throw new ISE("map[%s] cannot be null or empty.", map); } - final Iterator> iter = map.entrySet().iterator(); - Map.Entry entry = iter.next(); + final Iterator> iter = map.entrySet().iterator(); + Map.Entry entry = iter.next(); int numCells = entry.getValue().toAccessor().numRows(); if (iter.hasNext()) { entry = iter.next(); @@ -62,7 +63,10 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns } } - return new MapOfColumnsRowsAndColumns(map, map.values().iterator().next().toAccessor().numRows()); + return new MapOfColumnsRowsAndColumns( + (Map) map, + map.values().iterator().next().toAccessor().numRows() + ); } private final Map mapOfColumns; diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/OnHeapCumulativeAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/OnHeapCumulativeAggregatable.java deleted file mode 100644 index a931c3dbcab..00000000000 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/OnHeapCumulativeAggregatable.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.rowsandcols; - -import org.apache.druid.query.aggregation.AggregatorFactory; - -import java.util.ArrayList; -import java.util.List; - -/** - * A semantic interface used to cumulatively aggregate a list of AggregatorFactories across a given set of data - *

- * The aggregation specifically happens on-heap and should be used in places where it is known that the data - * set can be worked with entirely on-heap. - *

- * Note, as we implement frame-handling for window aggregations, it is expected that this interface will undergo a - * transformation. It might be deleted and replaced with something else, or might just see a change done in place. - * Either way, there is no assumption of enforced compatibility with this interface at this point in time. - */ -public interface OnHeapCumulativeAggregatable -{ - /** - * Cumulatively aggregates the data using the {@code List aggregateCumulative(List aggFactories); -} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java index 4caa4fda0da..3476bab60c8 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java @@ -30,6 +30,19 @@ import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; +/** + * This class exists to "decorate" a rows and columns such that it pretends to exist in a new ordering. + *

+ * The constructor generally takes an int[] of pointers, these pointers are used to re-map the rows from the + * RowsAndColumns object. That is, if the RowsAndColumns has 4 rows and the array {@code new int[]{3, 1, 2, 0}} + * is passed in, then the order of traverals of the rows will be {@code 3 -> 1 -> 2 -> 0}. + *

+ * This can be useful for sorting potentially immutable data, as the list of pointers can identify the order + * that the rows should be traversed in. It can also be used for clustering like-data together. + *

+ * While this avoids a copy, in cases where the data will be iterated regularly, it also generates a random-access + * pattern that is not always optimal. + */ public class RearrangedRowsAndColumns implements RowsAndColumns { private final Map columnCache = new LinkedHashMap<>(); diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index d49a14ae61f..7b6a1f6215d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -21,7 +21,7 @@ package org.apache.druid.query.rowsandcols; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.OnHeapAggregatable; +import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -39,8 +39,8 @@ import java.util.Collection; * with a Rows and columns, we rely on semantic interfaces using the {@link RowsAndColumns#as} method instead. *

* That is, the expectation is that anything that works with a RowsAndColumns will tend to first ask the RowsAndColumns - * object to become some other interface, for example, an {@link OnHeapAggregatable}. If a RowsAndColumns knows how - * to do a good job as the requested interface, it can return its own concrete implementation of the interface and + * object to become some other interface, for example, a {@link FramedOnHeapAggregatable}. If a RowsAndColumns knows + * how to do a good job as the requested interface, it can return its own concrete implementation of the interface and * run the necessary logic in its own optimized fashion. If the RowsAndColumns instance does not know how to implement * the semantic interface, it is expected that a default implementation of the interface can be instantiated on top of * the default column access mechanisms that the RowsAndColumns provides. Such default implementations should be diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/Column.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/Column.java index 97327faf9ce..957df530211 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/Column.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/Column.java @@ -20,6 +20,7 @@ package org.apache.druid.query.rowsandcols.column; import javax.annotation.Nonnull; +import javax.annotation.Nullable; /** * An interface representing a Column of data. @@ -37,7 +38,7 @@ public interface Column { /** * Returns the column as a {@link ColumnAccessor}. Semantically, this would be equivalent to calling - * {@Code Column.as(ColumnAccessor.class)}. However, being able to implement this interface is part of the explicit + * {@code Column.as(ColumnAccessor.class)}. However, being able to implement this interface is part of the explicit * contract of implementing this interface, so instead of relying on {@link #as} which allows for returning null, * we define a top-level method that should never return null. * @@ -58,6 +59,6 @@ public interface Column * @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had * through a local implementation of the interface. */ - @SuppressWarnings("unused") + @Nullable T as(Class clazz); } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessorBasedColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessorBasedColumn.java index 2e6a4a597ce..bff32750b70 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessorBasedColumn.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnAccessorBasedColumn.java @@ -19,6 +19,9 @@ package org.apache.druid.query.rowsandcols.column; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + public class ColumnAccessorBasedColumn implements Column { private final ColumnAccessor base; @@ -30,12 +33,14 @@ public class ColumnAccessorBasedColumn implements Column this.base = base; } + @Nonnull @Override public ColumnAccessor toAccessor() { return base; } + @Nullable @Override public T as(Class clazz) { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnValueSwapper.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnValueSwapper.java new file mode 100644 index 00000000000..f160c744fb6 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ColumnValueSwapper.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.column; + +/** + * A semantic interface for use with {@link Column} objects. + * + * This is used to swap values inside of a column. Note that this interface fundamentally mutates the underlying + * column. If a column cannot support mutation, it should not return return an implementation of this interface. + */ +public interface ColumnValueSwapper +{ + /** + * Swaps the values at the two row ids. There is no significant to "right" and "left", it's just easier to name + * the parameters that way. + * + * @param lhs the left-hand-side rowId + * @param rhs the right-hand-side rowId + */ + void swapValues(int lhs, int rhs); +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java index e7c778dc8a5..d054472f3d3 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ConstantObjectColumn.java @@ -19,21 +19,27 @@ package org.apache.druid.query.rowsandcols.column; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.column.ColumnType; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Arrays; + public class ConstantObjectColumn implements Column { private final Object obj; - private final int numCells; + private final int numRows; private final ColumnType type; - public ConstantObjectColumn(Object obj, int numCells, ColumnType type) + public ConstantObjectColumn(Object obj, int numRows, ColumnType type) { this.obj = obj; - this.numCells = numCells; + this.numRows = numRows; this.type = type; } + @Nonnull @Override public ColumnAccessor toAccessor() { @@ -48,7 +54,7 @@ public class ConstantObjectColumn implements Column @Override public int numRows() { - return numCells; + return numRows; } @Override @@ -95,9 +101,24 @@ public class ConstantObjectColumn implements Column }; } + @Nullable + @SuppressWarnings("unchecked") @Override public T as(Class clazz) { + if (VectorCopier.class.equals(clazz)) { + return (T) (VectorCopier) (into, intoStart) -> { + if (Integer.MAX_VALUE - numRows < intoStart) { + throw new ISE("too many rows!!! intoStart[%,d], numRows[%,d] combine to exceed max_int", intoStart, numRows); + } + Arrays.fill(into, intoStart, intoStart + numRows, obj); + }; + } + if (ColumnValueSwapper.class.equals(clazz)) { + return (T) (ColumnValueSwapper) (lhs, rhs) -> { + }; + } + return null; } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DefaultVectorCopier.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DefaultVectorCopier.java new file mode 100644 index 00000000000..bd31b543d2a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DefaultVectorCopier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.column; + +import org.apache.druid.java.util.common.ISE; + +public class DefaultVectorCopier implements VectorCopier +{ + ColumnAccessor accessor; + + public DefaultVectorCopier(ColumnAccessor accessor) + { + this.accessor = accessor; + } + + @Override + public void copyInto(Object[] into, int intoStart) + { + final int numRows = accessor.numRows(); + if (Integer.MAX_VALUE - numRows < intoStart) { + throw new ISE("too many rows!!! intoStart[%,d], numRows[%,d] combine to exceed max_int", intoStart, numRows); + } + + for (int i = 0; i < numRows; ++i) { + into[intoStart + i] = accessor.getObject(i); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java index 860ec4d10f3..c87e69f5009 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/DoubleArrayColumn.java @@ -19,8 +19,12 @@ package org.apache.druid.query.rowsandcols.column; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.column.ColumnType; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + public class DoubleArrayColumn implements Column { private final double[] vals; @@ -32,6 +36,7 @@ public class DoubleArrayColumn implements Column this.vals = vals; } + @Nonnull @Override public ColumnAccessor toAccessor() { @@ -93,9 +98,32 @@ public class DoubleArrayColumn implements Column }; } + @Nullable + @SuppressWarnings("unchecked") @Override public T as(Class clazz) { + if (VectorCopier.class.equals(clazz)) { + return (T) (VectorCopier) (into, intoStart) -> { + if (Integer.MAX_VALUE - vals.length < intoStart) { + throw new ISE( + "too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int", + intoStart, + vals.length + ); + } + for (int i = 0; i < vals.length; ++i) { + into[intoStart + i] = vals[i]; + } + }; + } + if (ColumnValueSwapper.class.equals(clazz)) { + return (T) (ColumnValueSwapper) (lhs, rhs) -> { + double tmp = vals[lhs]; + vals[lhs] = vals[rhs]; + vals[rhs] = tmp; + }; + } return null; } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java index 9e3745d153c..ee5a7d76046 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/IntArrayColumn.java @@ -19,8 +19,12 @@ package org.apache.druid.query.rowsandcols.column; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.column.ColumnType; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + public class IntArrayColumn implements Column { private final int[] vals; @@ -32,6 +36,7 @@ public class IntArrayColumn implements Column this.vals = vals; } + @Nonnull @Override public ColumnAccessor toAccessor() { @@ -93,9 +98,32 @@ public class IntArrayColumn implements Column }; } + @Nullable + @SuppressWarnings("unchecked") @Override public T as(Class clazz) { + if (VectorCopier.class.equals(clazz)) { + return (T) (VectorCopier) (into, intoStart) -> { + if (Integer.MAX_VALUE - vals.length < intoStart) { + throw new ISE( + "too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int", + intoStart, + vals.length + ); + } + for (int i = 0; i < vals.length; ++i) { + into[intoStart + i] = vals[i]; + } + }; + } + if (ColumnValueSwapper.class.equals(clazz)) { + return (T) (ColumnValueSwapper) (lhs, rhs) -> { + int tmp = vals[lhs]; + vals[lhs] = vals[rhs]; + vals[rhs] = tmp; + }; + } return null; } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/LimitedColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/LimitedColumn.java new file mode 100644 index 00000000000..7b1c2e17736 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/LimitedColumn.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.column; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.operator.window.value.ShiftedColumnAccessorBase; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +public class LimitedColumn implements Column +{ + private final Column column; + private final int start; + private final int end; + + public LimitedColumn(Column column, int start, int end) + { + this.column = column; + this.start = start; + this.end = end; + } + + @Nonnull + @Override + public ColumnAccessor toAccessor() + { + final ColumnAccessor columnAccessor = column.toAccessor(); + return new ShiftedColumnAccessorBase(columnAccessor) + { + @Override + public int numRows() + { + return end - start; + } + + @Override + protected int getActualValue(int rowNum) + { + int retVal = start + rowNum; + if (retVal >= end) { + throw new ISE("Index out of bounds[%d] >= [%d], start[%s]", retVal, end, start); + } + return retVal; + } + + @Override + protected boolean outsideBounds(int rowNum) + { + return false; + } + }; + } + + @Nullable + @Override + public T as(Class clazz) + { + return null; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java new file mode 100644 index 00000000000..73823534bec --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumn.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.column; + +import org.apache.druid.segment.column.ColumnType; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +public class NullColumn implements Column +{ + private final ColumnType type; + private final int numRows; + + public NullColumn( + ColumnType type, + int numRows + ) + { + this.type = type; + this.numRows = numRows; + } + + @Nonnull + @Override + public ColumnAccessor toAccessor() + { + return new Accessor(type, numRows); + } + + @Nullable + @Override + public T as(Class clazz) + { + return null; + } + + public static class Accessor implements ColumnAccessor + { + private final ColumnType type; + private final int size; + + public Accessor(ColumnType type, int size) + { + this.type = type; + this.size = size; + } + + @Override + public ColumnType getType() + { + return type; + } + + @Override + public int numRows() + { + return size; + } + + @Override + public boolean isNull(int rowNum) + { + return true; + } + + @Nullable + @Override + public Object getObject(int rowNum) + { + return null; + } + + @Override + public double getDouble(int rowNum) + { + return 0; + } + + @Override + public float getFloat(int rowNum) + { + return 0; + } + + @Override + public long getLong(int rowNum) + { + return 0; + } + + @Override + public int getInt(int rowNum) + { + return 0; + } + + @Override + public int compareRows(int lhsRowNum, int rhsRowNum) + { + return 0; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumnAccessor.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumnAccessor.java deleted file mode 100644 index a51a861a336..00000000000 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/NullColumnAccessor.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.rowsandcols.column; - -import org.apache.druid.segment.column.ColumnType; - -import javax.annotation.Nullable; - -public class NullColumnAccessor implements ColumnAccessor -{ - private final ColumnType type; - private final int size; - - public NullColumnAccessor(int size) - { - this(ColumnType.UNKNOWN_COMPLEX, size); - } - - public NullColumnAccessor(ColumnType type, int size) - { - this.type = type; - this.size = size; - } - - @Override - public ColumnType getType() - { - return type; - } - - @Override - public int numRows() - { - return size; - } - - @Override - public boolean isNull(int rowNum) - { - return true; - } - - @Nullable - @Override - public Object getObject(int rowNum) - { - return null; - } - - @Override - public double getDouble(int rowNum) - { - return 0; - } - - @Override - public float getFloat(int rowNum) - { - return 0; - } - - @Override - public long getLong(int rowNum) - { - return 0; - } - - @Override - public int getInt(int rowNum) - { - return 0; - } - - @Override - public int compareRows(int lhsRowNum, int rhsRowNum) - { - return 0; - } -} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java index 25e5ed2841b..46d875286a7 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/ObjectArrayColumn.java @@ -21,6 +21,8 @@ package org.apache.druid.query.rowsandcols.column; import org.apache.druid.segment.column.ColumnType; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Comparator; public class ObjectArrayColumn implements Column @@ -41,6 +43,19 @@ public class ObjectArrayColumn implements Column this.comparator = comparator; } + /** + * Gets the underlying object array. This method is exposed on the concrete class explicitly to allow for + * mutation. This class does absolutely nothing to ensure that said mutation of the array is valid. It is up + * to the caller that is choosing to do this mutation to make sure that it is safe. + * + * @return the object array backing this column + */ + public Object[] getObjects() + { + return objects; + } + + @Nonnull @Override public ColumnAccessor toAccessor() { @@ -72,9 +87,21 @@ public class ObjectArrayColumn implements Column }; } + @Nullable + @SuppressWarnings("unchecked") @Override public T as(Class clazz) { + if (VectorCopier.class.equals(clazz)) { + return (T) (VectorCopier) (into, intoStart) -> System.arraycopy(objects, 0, into, intoStart, objects.length); + } + if (ColumnValueSwapper.class.equals(clazz)) { + return (T) (ColumnValueSwapper) (lhs, rhs) -> { + Object tmp = objects[lhs]; + objects[lhs] = objects[rhs]; + objects[rhs] = tmp; + }; + } return null; } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/column/VectorCopier.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/VectorCopier.java new file mode 100644 index 00000000000..7dace3059f5 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/column/VectorCopier.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.column; + +/** + * A semantic interface for use with {@link Column} objects. Has methods to request that the {@link Column} copy + * values into provided array structures. Note that this interface is primarily useful to read from + * {@link Column} objects and not to write to them. + */ +public interface VectorCopier +{ + /** + * Copies all values from the underlying column into the {@code Object[]} passed into this method. + * + * @param into the object array to store the values + * @param intoStart the index of the into array to start writing into. + */ + void copyInto(Object[] into, int intoStart); +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitioner.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitioner.java index 7536686fc20..f84ebd8d495 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitioner.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitioner.java @@ -32,6 +32,15 @@ import java.util.List; */ public interface ClusteredGroupPartitioner { + static ClusteredGroupPartitioner fromRAC(RowsAndColumns rac) + { + ClusteredGroupPartitioner retVal = rac.as(ClusteredGroupPartitioner.class); + if (retVal == null) { + retVal = new DefaultClusteredGroupPartitioner(rac); + } + return retVal; + } + /** * Computes and returns a list of contiguous boundaries for independent groups. All rows in a specific grouping * should have the same values for the identified columns. Additionally, as this is assuming it is dealing with diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultClusteredGroupPartitioner.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultClusteredGroupPartitioner.java index 959e6d2cb08..0d4425ee714 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultClusteredGroupPartitioner.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultClusteredGroupPartitioner.java @@ -21,7 +21,7 @@ package org.apache.druid.query.rowsandcols.semantic; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; -import org.apache.druid.query.operator.LimitedRowsAndColumns; +import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; @@ -43,6 +43,10 @@ public class DefaultClusteredGroupPartitioner implements ClusteredGroupPartition @Override public int[] computeBoundaries(List columns) { + if (rac.numRows() == 0) { + return new int[]{}; + } + // Initialize to a grouping of everything IntList boundaries = new IntArrayList(new int[]{0, rac.numRows()}); @@ -78,6 +82,10 @@ public class DefaultClusteredGroupPartitioner implements ClusteredGroupPartition public ArrayList partitionOnBoundaries(List partitionColumns) { final int[] boundaries = computeBoundaries(partitionColumns); + if (boundaries.length < 2) { + return new ArrayList<>(); + } + ArrayList retVal = new ArrayList<>(boundaries.length - 1); for (int i = 1; i < boundaries.length; ++i) { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java index 2b7798a99c6..a6c9642c3ba 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.ObjectColumnSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Arrays; import java.util.concurrent.atomic.AtomicInteger; @@ -50,6 +51,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable this.rac = rac; } + @Nonnull @Override public RowsAndColumns aggregateAll( WindowFrame frame, @@ -299,12 +301,10 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable int upperOffset ) { - /** - * There are 3 different phases of operation when we have more rows than our window size - * 1. Our window is not full, as we walk the rows we build up towards filling it - * 2. Our window is full, as we walk the rows we take a value off and add a new aggregation - * 3. We are nearing the end of the rows, we need to start shrinking the window aperture - */ + // There are 3 different phases of operation when we have more rows than our window size + // 1. Our window is not full, as we walk the rows we build up towards filling it + // 2. Our window is full, as we walk the rows we take a value off and add a new aggregation + // 3. We are nearing the end of the rows, we need to start shrinking the window aperture int numRows = rac.numRows(); int windowSize = lowerOffset + upperOffset + 1; @@ -434,15 +434,16 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable int upperOffset ) { - int numRows = rac.numRows(); - int windowSize = numRows; + // In this case, we need to store a value for all items, so our windowSize is equivalent to the number of rows + // from the RowsAndColumns object that we are using. + int windowSize = rac.numRows(); // We store the results in an Object array for convenience. This is definitely sub-par from a memory management // point of view as we should use native arrays when possible. This will be fine for now, but it probably makes // sense to look at optimizing this in the future. That said, such an optimization might best come by having // a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead // of trying to optimize this generic implementation. - Object[][] results = new Object[aggFactories.length][numRows]; + Object[][] results = new Object[aggFactories.length][windowSize]; int resultStorageIndex = 0; AtomicInteger rowIdProvider = new AtomicInteger(0); @@ -458,11 +459,11 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable // This is the index to stop at for the current window aperture // The first row is used by all of the results for the lowerOffset num results, plus 1 for the "current row" - int stopIndex = Math.min(lowerOffset + 1, numRows); + int stopIndex = Math.min(lowerOffset + 1, windowSize); int startIndex = 0; int rowId = rowIdProvider.get(); - while (rowId < numRows) { + while (rowId < windowSize) { for (Aggregator[] aggregator : aggregators) { for (int j = startIndex; j < stopIndex; ++j) { aggregator[j].aggregate(); @@ -540,13 +541,16 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable } @Override - public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + @Nonnull + public DimensionSelector makeDimensionSelector(@Nonnull DimensionSpec dimensionSpec) { throw new UOE("combining factory shouldn't need dimensions, just columnValue, dim[%s]", dimensionSpec); } + @SuppressWarnings("rawtypes") @Override - public ColumnValueSelector makeColumnValueSelector(String columnName) + @Nonnull + public ColumnValueSelector makeColumnValueSelector(@Nonnull String columnName) { return new ObjectColumnSelector() { @@ -564,6 +568,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable } @Override + @Nonnull public Class classOfObject() { return results[index].getClass(); @@ -573,7 +578,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable @Nullable @Override - public ColumnCapabilities getColumnCapabilities(String column) + public ColumnCapabilities getColumnCapabilities(@Nonnull String column) { return columnCapabilities; } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultOnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultOnHeapAggregatable.java deleted file mode 100644 index 9f5abbce6d9..00000000000 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultOnHeapAggregatable.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.rowsandcols.semantic; - -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.rowsandcols.OnHeapCumulativeAggregatable; -import org.apache.druid.query.rowsandcols.RowsAndColumns; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -public class DefaultOnHeapAggregatable implements OnHeapAggregatable, OnHeapCumulativeAggregatable -{ - private final RowsAndColumns rac; - - public DefaultOnHeapAggregatable( - RowsAndColumns rac - ) - { - this.rac = rac; - } - - @Override - public ArrayList aggregateAll( - List aggFactories - ) - { - Aggregator[] aggs = new Aggregator[aggFactories.size()]; - - AtomicInteger currRow = new AtomicInteger(0); - int index = 0; - for (AggregatorFactory aggFactory : aggFactories) { - aggs[index++] = aggFactory.factorize(new DefaultColumnSelectorFactoryMaker.ColumnAccessorBasedColumnSelectorFactory( - currRow, - rac - )); - } - - int numRows = rac.numRows(); - int rowId = currRow.get(); - while (rowId < numRows) { - for (Aggregator agg : aggs) { - agg.aggregate(); - } - rowId = currRow.incrementAndGet(); - } - - ArrayList retVal = new ArrayList<>(aggs.length); - for (Aggregator agg : aggs) { - retVal.add(agg.get()); - agg.close(); - } - return retVal; - } - - @Override - public ArrayList aggregateCumulative(List aggFactories) - { - Aggregator[] aggs = new Aggregator[aggFactories.size()]; - ArrayList retVal = new ArrayList<>(aggFactories.size()); - - int numRows = rac.numRows(); - AtomicInteger currRow = new AtomicInteger(0); - int index = 0; - for (AggregatorFactory aggFactory : aggFactories) { - aggs[index++] = aggFactory.factorize(new DefaultColumnSelectorFactoryMaker.ColumnAccessorBasedColumnSelectorFactory( - currRow, - rac - )); - retVal.add(new Object[numRows]); - } - - int rowId = currRow.get(); - while (rowId < numRows) { - for (int i = 0; i < aggs.length; ++i) { - aggs[i].aggregate(); - retVal.get(i)[rowId] = aggs[i].get(); - } - rowId = currRow.incrementAndGet(); - } - - for (Aggregator agg : aggs) { - agg.close(); - } - - return retVal; - } - -} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatable.java index deabe54338f..3a3e9130a17 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatable.java @@ -23,6 +23,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.operator.window.WindowFrame; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import javax.annotation.Nonnull; + /** * A semantic interface used to aggregate a list of AggregatorFactories across a given set of data *

@@ -53,5 +55,6 @@ public interface FramedOnHeapAggregatable * @return a RowsAndColumns that contains columns representing the results of the aggregation * from the AggregatorFactories */ + @Nonnull RowsAndColumns aggregateAll(WindowFrame frame, AggregatorFactory[] aggFactories); } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/NaiveSortMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/NaiveSortMaker.java index 912c114c17c..32bea5331fc 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/NaiveSortMaker.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/NaiveSortMaker.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import org.apache.druid.query.operator.ColumnWithDirection; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import javax.annotation.Nullable; import java.util.ArrayList; /** @@ -53,6 +54,7 @@ public interface NaiveSortMaker * @param rac the data to include in the sort * @return optionally, a RowsAndColumns object of data that is known to be in sorted order, null if nothing yet. */ + @Nullable RowsAndColumns moreData(RowsAndColumns rac); /** @@ -60,15 +62,12 @@ public interface NaiveSortMaker * * @return A RowsAndColumns object of sorted data that has not been returned already from {@link #moreData} calls. */ + @Nullable RowsAndColumns complete(); } /** - * Makes the NaiveSorter that will actually do the sort. This method uses {@code List} to avoid - * littering the code with extra objects for the same thing. {@code OrderByColumnSpec} is only used to identify - * which column should be sorted and in which direction. Specifically, it has a "dimensionComparator" field which - * seems to indicate that it's possible to provide a specific comparator ordering, this should be completely ignored - * by implementations of the NaiveSorter interface. + * Makes the NaiveSorter that will actually do the sort. * * @param ordering a specification of which columns to sort in which direction * @return a NaiveSorter that will sort according to the provided spec diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/OnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/OnHeapAggregatable.java deleted file mode 100644 index 20421dfea2d..00000000000 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/OnHeapAggregatable.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.rowsandcols.semantic; - -import org.apache.druid.query.aggregation.AggregatorFactory; - -import java.util.ArrayList; -import java.util.List; - -/** - * A semantic interface used to aggregate a list of AggregatorFactories across a given set of data - *

- * The aggregation specifically happens on-heap and should be used in places where it is known that the data - * set can be worked with entirely on-heap. - *

- * Note, as we implement frame-handling for window aggregations, it is expected that this interface will undergo a - * transformation. It might be deleted and replaced with something else, or might just see a change done in place. - * Either way, there is no assumption of enforced compatibility with this interface at this point in time. - */ -public interface OnHeapAggregatable -{ - /** - * Aggregates the data using the {@code List aggregateAll(List aggFactories); -} diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java index b837a4803dd..b9781b12d74 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java @@ -20,6 +20,7 @@ package org.apache.druid.query.operator.window; import com.google.common.collect.ImmutableSet; +import it.unimi.dsi.fastutil.ints.IntArrayList; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.rowsandcols.RowsAndColumns; @@ -100,15 +101,23 @@ public class RowsAndColumnsHelper public RowsAndColumnsHelper expectColumn(String col, ColumnType type, Object... expectedVals) { - final ColumnHelper helper = columnHelper(col, expectedVals.length, type); - helper.setExpectation(expectedVals); - return this; + return expectColumn(col, expectedVals, type); } public RowsAndColumnsHelper expectColumn(String col, Object[] expectedVals, ColumnType type) { + IntArrayList nullPositions = new IntArrayList(); + for (int i = 0; i < expectedVals.length; i++) { + if (expectedVals[i] == null) { + nullPositions.add(i); + } + } + final ColumnHelper helper = columnHelper(col, expectedVals.length, type); helper.setExpectation(expectedVals); + if (!nullPositions.isEmpty()) { + helper.setNulls(nullPositions.toIntArray()); + } return this; } @@ -159,7 +168,9 @@ public class RowsAndColumnsHelper } for (Map.Entry entry : helpers.entrySet()) { - entry.getValue().validate(StringUtils.format("%s.%s", name, entry.getKey()), rac.findColumn(entry.getKey())); + final Column racColumn = rac.findColumn(entry.getKey()); + Assert.assertNotNull(racColumn); + entry.getValue().validate(StringUtils.format("%s.%s", name, entry.getKey()), racColumn); } } diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/WindowAggregateProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/WindowAggregateProcessorTest.java deleted file mode 100644 index 99581dbf176..00000000000 --- a/processing/src/test/java/org/apache/druid/query/operator/window/WindowAggregateProcessorTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.operator.window; - -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory; -import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; -import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; -import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor; -import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; -import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn; -import org.apache.druid.query.rowsandcols.column.IntArrayColumn; -import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; -import org.apache.druid.segment.column.ColumnType; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.Map; - -public class WindowAggregateProcessorTest -{ - static { - NullHandling.initializeForTests(); - } - - @Test - public void testAggregation() - { - Map map = new LinkedHashMap<>(); - map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); - map.put("doubleCol", new DoubleArrayColumn(new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); - map.put("objectCol", new ObjectArrayColumn( - new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}, - ColumnType.STRING - ) - ); - - MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map); - - WindowAggregateProcessor processor = new WindowAggregateProcessor( - Arrays.asList( - new LongSumAggregatorFactory("sumFromLong", "intCol"), - new LongSumAggregatorFactory("sumFromDouble", "doubleCol"), - new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), - new DoubleMaxAggregatorFactory("maxFromDouble", "doubleCol") - ), - Arrays.asList( - new LongMaxAggregatorFactory("cummMax", "intCol"), - new DoubleSumAggregatorFactory("cummSum", "doubleCol") - ) - ); - - RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() - .expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) - .expectColumn("doubleCol", new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) - .expectColumn("sumFromLong", new long[]{45, 45, 45, 45, 45, 45, 45, 45, 45, 45}) - .expectColumn("sumFromDouble", new long[]{45, 45, 45, 45, 45, 45, 45, 45, 45, 45}) - .expectColumn("maxFromInt", new double[]{9, 9, 9, 9, 9, 9, 9, 9, 9, 9}) - .expectColumn("maxFromDouble", new double[]{9, 9, 9, 9, 9, 9, 9, 9, 9, 9}) - .expectColumn("cummMax", new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) - .expectColumn("cummSum", new double[]{0, 1, 3, 6, 10, 15, 21, 28, 36, 45}); - - final RowsAndColumns results = processor.process(rac); - expectations.validate(results); - } - - @Test - public void testValidateEquality() - { - WindowAggregateProcessor processor = new WindowAggregateProcessor( - Arrays.asList( - new LongSumAggregatorFactory("sumFromLong", "intCol"), - new LongSumAggregatorFactory("sumFromDouble", "doubleCol"), - new DoubleMaxAggregatorFactory("maxFromInt", "intCol"), - new DoubleMaxAggregatorFactory("maxFromDouble", "doubleCol") - ), - Arrays.asList( - new LongMaxAggregatorFactory("cummMax", "intCol"), - new DoubleSumAggregatorFactory("cummSum", "doubleCol") - ) - ); - - Assert.assertTrue(processor.validateEquivalent(processor)); - Assert.assertFalse(processor.validateEquivalent(new WindowRowNumberProcessor("bob"))); - Assert.assertFalse(processor.validateEquivalent(new WindowAggregateProcessor(processor.getAggregations(), null))); - Assert.assertFalse(processor.validateEquivalent( - new WindowAggregateProcessor(new ArrayList<>(), processor.getCumulativeAggregations()) - )); - Assert.assertFalse(processor.validateEquivalent(new WindowAggregateProcessor(new ArrayList<>(), null))); - } - -} diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java index 706584a1c7c..145d37a4384 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.operator.window; import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; @@ -36,6 +37,10 @@ import org.junit.Test; @SuppressWarnings("unchecked") public class WindowFramedAggregateProcessorTest { + static { + NullHandling.initializeForTests(); + } + @Test public void testIsPassThruWhenRACReturnsSemanticInterface() { diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumnsTest.java index d79bce526af..b95f750632d 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumnsTest.java @@ -19,8 +19,6 @@ package org.apache.druid.query.rowsandcols; -import org.apache.druid.query.operator.LimitedRowsAndColumns; - import java.util.ArrayList; import java.util.function.Function; diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/column/NullColumnAccessorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/column/NullColumnTest.java similarity index 88% rename from processing/src/test/java/org/apache/druid/query/rowsandcols/column/NullColumnAccessorTest.java rename to processing/src/test/java/org/apache/druid/query/rowsandcols/column/NullColumnTest.java index 0eed413bb29..552ee97ea72 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/column/NullColumnAccessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/column/NullColumnTest.java @@ -19,16 +19,18 @@ package org.apache.druid.query.rowsandcols.column; +import org.apache.druid.segment.column.ColumnType; import org.junit.Assert; import org.junit.Test; -public class NullColumnAccessorTest +public class NullColumnTest { @Test public void testSanity() { - NullColumnAccessor accessor = new NullColumnAccessor(10); + NullColumn col = new NullColumn(ColumnType.UNKNOWN_COMPLEX, 10); + ColumnAccessor accessor = col.toAccessor(); Assert.assertEquals(10, accessor.numRows()); for (int i = 0; i < 10; ++i) { diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/AppendableRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/AppendableRowsAndColumnsTest.java index 57790ae0c0e..e5d6eb1faa7 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/AppendableRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/AppendableRowsAndColumnsTest.java @@ -24,8 +24,12 @@ import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; +import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.function.Function; public class AppendableRowsAndColumnsTest extends SemanticTestBase @@ -60,4 +64,32 @@ public class AppendableRowsAndColumnsTest extends SemanticTestBase .validate(appender); } + @Test + public void testAppendableRowsAndColumnsCanBeUsedForClusterGrouper() + { + RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) + ) + )); + + AppendableRowsAndColumns appender = RowsAndColumns.expectAppendable(rac); + + appender.addColumn( + "sorted", + new ObjectArrayColumn(new Object[]{null, null, null, 1, 1, 2, 4, 4, 4}, ColumnType.LONG) + ); + + new RowsAndColumnsHelper() + .expectColumn("unsorted", new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) + .expectColumn("sorted", new Object[]{null, null, null, 1, 1, 2, 4, 4, 4}, ColumnType.LONG) + .allColumnsRegistered() + .validate(appender); + + final ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(appender); + final int[] boundaries = parter.computeBoundaries(Collections.singletonList("sorted")); + + Assert.assertArrayEquals(new int[]{0, 3, 5, 6, 9}, boundaries); + } + } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitionerTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitionerTest.java index 4196f644671..d9851b17530 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitionerTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/ClusteredGroupPartitionerTest.java @@ -24,6 +24,8 @@ import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; +import org.apache.druid.segment.column.ColumnType; import org.junit.Assert; import org.junit.Test; @@ -44,6 +46,18 @@ public class ClusteredGroupPartitionerTest extends SemanticTestBase super(name, fn); } + @Test + public void testEmpty() + { + RowsAndColumns rac = make(new MapOfColumnsRowsAndColumns(ImmutableMap.of(), 0)); + + final ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(rac); + + final List cols = Collections.singletonList("notThere"); + Assert.assertArrayEquals(new int[]{}, parter.computeBoundaries(cols)); + Assert.assertTrue(parter.partitionOnBoundaries(cols).isEmpty()); + } + @Test public void testDefaultClusteredGroupPartitioner() { @@ -54,10 +68,7 @@ public class ClusteredGroupPartitionerTest extends SemanticTestBase ) )); - ClusteredGroupPartitioner parter = rac.as(ClusteredGroupPartitioner.class); - if (parter == null) { - parter = new DefaultClusteredGroupPartitioner(rac); - } + ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(rac); int[] expectedBounds = new int[]{0, 3, 5, 6, 9}; @@ -117,4 +128,48 @@ public class ClusteredGroupPartitionerTest extends SemanticTestBase } Assert.assertFalse(unsortedChunks.hasNext()); } + + @Test + public void testDefaultClusteredGroupPartitionerWithNulls() + { + RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "sorted", new ObjectArrayColumn(new Object[]{null, null, null, 1, 1, 2, 4, 4, 4}, ColumnType.LONG), + "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) + ) + )); + + ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(rac); + + int[] expectedBounds = new int[]{0, 3, 5, 6, 9}; + + List expectations = Arrays.asList( + new RowsAndColumnsHelper() + .expectColumn("sorted", new Object[]{null, null, null}, ColumnType.LONG) + .expectColumn("unsorted", new int[]{3, 54, 21}) + .allColumnsRegistered(), + new RowsAndColumnsHelper() + .expectColumn("sorted", new int[]{1, 1}) + .expectColumn("unsorted", new int[]{1, 5}) + .allColumnsRegistered(), + new RowsAndColumnsHelper() + .expectColumn("sorted", new int[]{2}) + .expectColumn("unsorted", new int[]{54}) + .allColumnsRegistered(), + new RowsAndColumnsHelper() + .expectColumn("sorted", new int[]{4, 4, 4}) + .expectColumn("unsorted", new int[]{2, 3, 92}) + .allColumnsRegistered() + ); + + final List partCols = Collections.singletonList("sorted"); + Assert.assertArrayEquals(expectedBounds, parter.computeBoundaries(partCols)); + + final Iterator partedChunks = parter.partitionOnBoundaries(partCols).iterator(); + for (RowsAndColumnsHelper expectation : expectations) { + Assert.assertTrue(partedChunks.hasNext()); + expectation.validate(partedChunks.next()); + } + Assert.assertFalse(partedChunks.hasNext()); + } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/CombinedSemanticInterfacesTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/CombinedSemanticInterfacesTest.java new file mode 100644 index 00000000000..9a5bce17c58 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/CombinedSemanticInterfacesTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.semantic; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.operator.ColumnWithDirection; +import org.apache.druid.query.operator.window.RowsAndColumnsHelper; +import org.apache.druid.query.operator.window.WindowFrame; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; +import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Function; + +/** + * Place where tests can live that are testing the interactions of multiple semantic interfaces + */ +@SuppressWarnings("ConstantConditions") +public class CombinedSemanticInterfacesTest extends SemanticTestBase +{ + public CombinedSemanticInterfacesTest( + String name, + Function fn + ) + { + super(name, fn); + } + + /** + * Tests a relatively common series of operations for window functions: partition -> aggregate -> sort + */ + @Test + public void testPartitionAggregateAndSortTest() + { + RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}), + "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) + ) + )); + + ClusteredGroupPartitioner parter = rac.as(ClusteredGroupPartitioner.class); + if (parter == null) { + parter = new DefaultClusteredGroupPartitioner(rac); + } + + final ArrayList partitioned = parter.partitionOnBoundaries(Collections.singletonList("sorted")); + Assert.assertEquals(4, partitioned.size()); + + NaiveSortMaker.NaiveSorter sorter = null; + for (RowsAndColumns rowsAndColumns : partitioned) { + final FramedOnHeapAggregatable aggregatable = FramedOnHeapAggregatable.fromRAC(rowsAndColumns); + final RowsAndColumns aggedRAC = aggregatable.aggregateAll( + WindowFrame.unbounded(), + new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "unsorted")} + ); + if (sorter == null) { + sorter = NaiveSortMaker.fromRAC(aggedRAC).make(ColumnWithDirection.ascending("unsorted")); + } else { + Assert.assertNull(sorter.moreData(aggedRAC)); + } + } + Assert.assertNotNull(sorter); + + final RowsAndColumns completed = sorter.complete(); + Assert.assertNotNull(completed); + + new RowsAndColumnsHelper() + .expectColumn("sorted", new int[]{1, 4, 0, 4, 1, 0, 0, 2, 4}) + .expectColumn("unsorted", new int[]{1, 2, 3, 3, 5, 21, 54, 54, 92}) + .expectColumn("sum", new long[]{6, 97, 78, 97, 6, 78, 78, 54, 97}) + .allColumnsRegistered() + .validate(completed); + } + + /** + * Tests a relatively common series of operations for window functions: partition -> aggregate + */ + @Test + public void testPartitionThenAggregateNoSortTest() + { + RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}), + "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) + ) + )); + + ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(rac); + + final ArrayList partitioned = parter.partitionOnBoundaries(Collections.singletonList("sorted")); + Assert.assertEquals(4, partitioned.size()); + + Map outputCols = new LinkedHashMap<>(); + outputCols.put("sorted", new ObjectArrayColumn(new Object[9], ColumnType.LONG)); + outputCols.put("unsorted", new ObjectArrayColumn(new Object[9], ColumnType.LONG)); + outputCols.put("sum", new ObjectArrayColumn(new Object[9], ColumnType.LONG)); + + int rowCounter = 0; + for (RowsAndColumns rowsAndColumns : partitioned) { + final FramedOnHeapAggregatable aggregatable = FramedOnHeapAggregatable.fromRAC(rowsAndColumns); + final RowsAndColumns aggedRAC = aggregatable.aggregateAll( + WindowFrame.unbounded(), + new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "unsorted")} + ); + for (Map.Entry entry : outputCols.entrySet()) { + Object[] objArray = entry.getValue().getObjects(); + final ColumnAccessor columnAccessor = aggedRAC.findColumn(entry.getKey()).toAccessor(); + for (int i = 0; i < columnAccessor.numRows(); ++i) { + objArray[rowCounter + i] = columnAccessor.getObject(i); + } + } + rowCounter += aggedRAC.numRows(); + } + + RowsAndColumns completed = MapOfColumnsRowsAndColumns.fromMap(outputCols); + + new RowsAndColumnsHelper() + .expectColumn("sorted", new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}) + .expectColumn("unsorted", new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92}) + .expectColumn("sum", new long[]{78, 78, 78, 6, 6, 54, 97, 97, 97}) + .allColumnsRegistered() + .validate(completed); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/OnHeapAggregatableTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/OnHeapAggregatableTest.java deleted file mode 100644 index c8f69d4a98c..00000000000 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/OnHeapAggregatableTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.rowsandcols.semantic; - -import com.google.common.collect.ImmutableMap; -import org.apache.druid.query.aggregation.LongMaxAggregatorFactory; -import org.apache.druid.query.aggregation.LongMinAggregatorFactory; -import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; -import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.column.IntArrayColumn; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.function.Function; - -public class OnHeapAggregatableTest extends SemanticTestBase -{ - public OnHeapAggregatableTest( - String name, - Function fn - ) - { - super(name, fn); - } - - @Test - public void testOnHeapAggregatable() - { - RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap( - ImmutableMap.of( - "incremented", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), - "zeroesOut", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) - ) - )); - - OnHeapAggregatable agger = rac.as(OnHeapAggregatable.class); - if (agger == null) { - agger = new DefaultOnHeapAggregatable(rac); - } - - final ArrayList results = agger.aggregateAll(Arrays.asList( - new LongSumAggregatorFactory("incremented", "incremented"), - new LongMaxAggregatorFactory("zeroesOutMax", "zeroesOut"), - new LongMinAggregatorFactory("zeroesOutMin", "zeroesOut") - )); - - Assert.assertEquals(3, results.size()); - Assert.assertEquals(55L, results.get(0)); - Assert.assertEquals(82L, results.get(1)); - Assert.assertEquals(-90L, results.get(2)); - } -}