Semantic Implementations for ArrayListRAC (#13652)

* Semantic Implementations for ArrayListRAC

This adds implementations of semantic interfaces
to optimize (eliminate object creation) the
window processing on top of an ArrayListSegment.

Tests are also added to cover the interplay
between the semantic interfaces that are expected
for this use case
This commit is contained in:
imply-cheddar 2023-01-14 12:42:34 +09:00 committed by GitHub
parent 4368b3a071
commit 566fc990e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1216 additions and 716 deletions

View File

@ -35,7 +35,7 @@ import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.NullColumnAccessor;
import org.apache.druid.query.rowsandcols.column.NullColumn;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@ -165,7 +165,7 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndCol
.getColumnType(columnName)
.orElse(ColumnType.UNKNOWN_COMPLEX);
accessors[index] = new NullColumnAccessor(columnType, rac.numRows());
accessors[index] = new NullColumn.Accessor(columnType, rac.numRows());
} else {
accessors[index] = column.toAccessor();
}

View File

@ -55,7 +55,6 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
@JsonSubTypes.Type(name = "first", value = WindowFirstProcessor.class),
@JsonSubTypes.Type(name = "last", value = WindowLastProcessor.class),
@JsonSubTypes.Type(name = "offset", value = WindowOffsetProcessor.class),
@JsonSubTypes.Type(name = "aggregate", value = WindowAggregateProcessor.class),
@JsonSubTypes.Type(name = "framedAgg", value = WindowFramedAggregateProcessor.class)
})
public interface Processor

View File

@ -1,131 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator.window;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.rowsandcols.OnHeapCumulativeAggregatable;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ConstantObjectColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.DefaultOnHeapAggregatable;
import org.apache.druid.query.rowsandcols.semantic.OnHeapAggregatable;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class WindowAggregateProcessor implements Processor
{
@Nullable
private static <T> List<T> emptyToNull(List<T> list)
{
if (list == null || list.isEmpty()) {
return null;
} else {
return list;
}
}
private final List<AggregatorFactory> aggregations;
private final List<AggregatorFactory> cumulativeAggregations;
@JsonCreator
public WindowAggregateProcessor(
@JsonProperty("aggregations") List<AggregatorFactory> aggregations,
@JsonProperty("cumulativeAggregations") List<AggregatorFactory> cumulativeAggregations
)
{
this.aggregations = emptyToNull(aggregations);
this.cumulativeAggregations = emptyToNull(cumulativeAggregations);
}
@JsonProperty("aggregations")
public List<AggregatorFactory> getAggregations()
{
return aggregations;
}
@JsonProperty("cumulativeAggregations")
public List<AggregatorFactory> getCumulativeAggregations()
{
return cumulativeAggregations;
}
@Override
public RowsAndColumns process(RowsAndColumns inputPartition)
{
AppendableRowsAndColumns retVal = RowsAndColumns.expectAppendable(inputPartition);
if (aggregations != null) {
OnHeapAggregatable aggregatable = inputPartition.as(OnHeapAggregatable.class);
if (aggregatable == null) {
aggregatable = new DefaultOnHeapAggregatable(inputPartition);
}
final ArrayList<Object> aggregatedVals = aggregatable.aggregateAll(aggregations);
for (int i = 0; i < aggregations.size(); ++i) {
final AggregatorFactory agg = aggregations.get(i);
retVal.addColumn(
agg.getName(),
new ConstantObjectColumn(aggregatedVals.get(i), inputPartition.numRows(), agg.getResultType())
);
}
}
if (cumulativeAggregations != null) {
OnHeapCumulativeAggregatable cummulativeAgg = inputPartition.as(OnHeapCumulativeAggregatable.class);
if (cummulativeAgg == null) {
cummulativeAgg = new DefaultOnHeapAggregatable(inputPartition);
}
final ArrayList<Object[]> cumulativeVals = cummulativeAgg.aggregateCumulative(cumulativeAggregations);
for (int i = 0; i < cumulativeAggregations.size(); ++i) {
final AggregatorFactory agg = cumulativeAggregations.get(i);
retVal.addColumn(agg.getName(), new ObjectArrayColumn(cumulativeVals.get(i), agg.getResultType()));
}
}
return retVal;
}
@Override
public boolean validateEquivalent(Processor otherProcessor)
{
if (otherProcessor instanceof WindowAggregateProcessor) {
WindowAggregateProcessor other = (WindowAggregateProcessor) otherProcessor;
return Objects.equals(aggregations, other.aggregations)
&& Objects.equals(cumulativeAggregations, other.cumulativeAggregations);
}
return false;
}
@Override
public String toString()
{
return "WindowAggregateProcessor{" +
"aggregations=" + aggregations +
", cumulativeAggregations=" + cumulativeAggregations +
'}';
}
}

View File

@ -26,6 +26,11 @@ import java.util.Objects;
public class WindowFrame
{
public static WindowFrame unbounded()
{
return new WindowFrame(PeerType.ROWS, true, 0, true, 0);
}
@SuppressWarnings("unused")
public enum PeerType
{

View File

@ -19,25 +19,68 @@
package org.apache.druid.query.rowsandcols;
import it.unimi.dsi.fastutil.Arrays;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntComparator;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.ColumnValueSwapper;
import org.apache.druid.query.rowsandcols.column.DefaultVectorCopier;
import org.apache.druid.query.rowsandcols.column.LimitedColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectColumnAccessorBase;
import org.apache.druid.query.rowsandcols.column.VectorCopier;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.NaiveSortMaker;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns
/**
* ArrayListRowsAndColumns is a RowsAndColumns implementation that believes it has all of its data on-heap.
* <p>
* It is an AppendableRowsAndColumns and, as with all RowsAndColumns, it is not thread-safe for multiple writes.
* Under certain circumstances, concurrent reads from multiple threads can be correct, but the code has to be follow
* a very strict ordering of code that ensures that reads only happen after writes and once a value is read, it will
* never be overwritten.
* <p>
* Additionally, this object implements various of the semantic interfaces directly to provide some degree
* of processing and memory optimization.
*
* @param <RowType>
*/
public class ArrayListRowsAndColumns<RowType> implements AppendableRowsAndColumns
{
@SuppressWarnings("rawtypes")
private static final HashMap<Class<?>, Function<ArrayListRowsAndColumns, ?>> AS_MAP = makeAsMap();
private final ArrayList<RowType> rows;
private final RowAdapter<RowType> rowAdapter;
private final RowSignature rowSignature;
private final Map<String, Column> extraColumns;
private final Set<String> columnNames;
private final int startOffset;
private final int endOffset;
public ArrayListRowsAndColumns(
ArrayList<RowType> rows,
@ -45,21 +88,49 @@ public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns
RowSignature rowSignature
)
{
this(
rows,
rowAdapter,
rowSignature,
new LinkedHashMap<>(),
new LinkedHashSet<>(rowSignature.getColumnNames()),
0,
rows.size()
);
}
private ArrayListRowsAndColumns(
ArrayList<RowType> rows,
RowAdapter<RowType> rowAdapter,
RowSignature rowSignature,
Map<String, Column> extraColumns,
Set<String> columnNames,
int startOffset,
int endOffset
)
{
if (endOffset - startOffset < 0) {
throw new ISE("endOffset[%,d] - startOffset[%,d] was somehow negative!?", endOffset, startOffset);
}
this.rows = rows;
this.rowAdapter = rowAdapter;
this.rowSignature = rowSignature;
this.extraColumns = extraColumns;
this.columnNames = columnNames;
this.startOffset = startOffset;
this.endOffset = endOffset;
}
@Override
public Collection<String> getColumnNames()
{
return rowSignature.getColumnNames();
return columnNames;
}
@Override
public int numRows()
{
return rows.size();
return endOffset - startOffset;
}
@Override
@ -67,7 +138,11 @@ public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns
public Column findColumn(String name)
{
if (!rowSignature.contains(name)) {
return null;
final Column retVal = extraColumns.get(name);
if (numRows() == rows.size()) {
return retVal;
}
return new LimitedColumn(retVal, startOffset, endOffset);
}
final Function<RowType, Object> adapterForValue = rowAdapter.columnFunction(name);
@ -77,6 +152,7 @@ public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns
return new Column()
{
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
@ -85,7 +161,7 @@ public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns
@Override
protected Object getVal(int rowNum)
{
return adapterForValue.apply(rows.get(rowNum));
return adapterForValue.apply(rows.get(startOffset + rowNum));
}
@Override
@ -103,11 +179,12 @@ public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns
@Override
public int numRows()
{
return rows.size();
return endOffset - startOffset;
}
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
@ -118,8 +195,340 @@ public class ArrayListRowsAndColumns<RowType> implements RowsAndColumns
@Nullable
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public <T> T as(Class<T> clazz)
{
return null;
final Function<ArrayListRowsAndColumns, ?> fn = AS_MAP.get(clazz);
if (fn == null) {
return null;
}
return (T) fn.apply(this);
}
@Override
public void addColumn(String name, Column column)
{
if (rows.size() == numRows()) {
extraColumns.put(name, column);
columnNames.add(name);
return;
}
// When an ArrayListRowsAndColumns is only a partial view, but adds a column, it believes that the same column
// will eventually be added for all of the rows so we pre-allocate storage for the entire set of data and
// copy.
final ColumnAccessor columnAccessor = column.toAccessor();
if (columnAccessor.numRows() != numRows()) {
throw new ISE("More rows[%,d] than expected[%,d]", columnAccessor.numRows(), numRows());
}
final Column extraColumn = extraColumns.get(name);
final ObjectArrayColumn existingColumn;
if (extraColumn == null) {
existingColumn = new ObjectArrayColumn(new Object[rows.size()], columnAccessor.getType());
extraColumns.put(name, existingColumn);
columnNames.add(name);
} else if (extraColumn instanceof ObjectArrayColumn) {
existingColumn = (ObjectArrayColumn) extraColumn;
} else {
throw new ISE(
"Partial column[%s] was added, but already have full column[%s]",
column.getClass(),
extraColumn.getClass()
);
}
VectorCopier copier = column.as(VectorCopier.class);
if (copier == null) {
copier = new DefaultVectorCopier(columnAccessor);
}
copier.copyInto(existingColumn.getObjects(), startOffset);
}
private ArrayListRowsAndColumns<RowType> limited(int startOffset, int endOffset)
{
return new ArrayListRowsAndColumns<>(
rows,
rowAdapter,
rowSignature,
extraColumns,
columnNames,
startOffset,
endOffset
);
}
public void sort(ArrayList<ColumnWithDirection> ordering)
{
ArrayList<IntComparator> comparators = new ArrayList<>(ordering.size());
for (ColumnWithDirection columnWithDirection : ordering) {
final Column column = findColumn(columnWithDirection.getColumn());
if (column != null) {
comparators.add(new IntComparator()
{
final ColumnAccessor accessor = column.toAccessor();
private final int directionInt = columnWithDirection.getDirection().getDirectionInt();
@Override
public int compare(int lhs, int rhs)
{
return accessor.compareRows(lhs, rhs) * directionInt;
}
});
}
}
ArrayList<ColumnValueSwapper> swappers = new ArrayList<>(extraColumns.size());
for (Map.Entry<String, Column> entry : extraColumns.entrySet()) {
final Column column = entry.getValue();
final ColumnValueSwapper swapper = column.as(ColumnValueSwapper.class);
if (swapper == null) {
throw new ISE("Column[%s] of type[%s] cannot be sorted.", entry.getKey(), column.getClass());
}
swappers.add(swapper);
}
Arrays.quickSort(
0,
rows.size(),
(lhs, rhs) -> {
for (IntComparator comparator : comparators) {
final int retVal = comparator.compare(lhs, rhs);
if (retVal != 0) {
return retVal;
}
}
return 0;
},
(lhs, rhs) -> {
RowType tmp = rows.get(lhs);
rows.set(lhs, rows.get(rhs));
rows.set(rhs, tmp);
for (ColumnValueSwapper swapper : swappers) {
swapper.swapValues(lhs, rhs);
}
}
);
}
@SuppressWarnings("rawtypes")
private static HashMap<Class<?>, Function<ArrayListRowsAndColumns, ?>> makeAsMap()
{
HashMap<Class<?>, Function<ArrayListRowsAndColumns, ?>> retVal = new HashMap<>();
retVal.put(
ClusteredGroupPartitioner.class,
(Function<ArrayListRowsAndColumns, ClusteredGroupPartitioner>) rac -> rac.new MyClusteredGroupPartitioner()
);
retVal.put(
NaiveSortMaker.class,
(Function<ArrayListRowsAndColumns, NaiveSortMaker>) rac -> {
if (rac.startOffset != 0) {
throw new ISE(
"The NaiveSortMaker should happen on the first RAC, start was [%,d], end was [%,d]",
rac.startOffset,
rac.endOffset
);
}
if (rac.endOffset == rac.rows.size()) {
// In this case, we are being sorted along with other RowsAndColumns objects, we don't have an optimized
// implementation for that, so just return null
//noinspection ReturnOfNull
return null;
}
// When we are doing a naive sort and we are dealing with the first sub-window from ourselves, then we assume
// that we will see all of the other sub-windows as well, we can run through them and then sort the underlying
// rows at the very end.
return rac.new MyNaiveSortMaker();
}
);
return retVal;
}
private class MyClusteredGroupPartitioner implements ClusteredGroupPartitioner
{
@Override
public int[] computeBoundaries(List<String> columns)
{
if (numRows() == 0) {
return new int[]{};
}
boolean allInSignature = true;
for (String column : columns) {
if (!rowSignature.contains(column)) {
allInSignature = false;
}
}
if (allInSignature) {
return computeBoundariesAllInSignature(columns);
} else {
return computeBoundariesSomeAppended(columns);
}
}
/**
* Computes boundaries assuming all columns are defined as in the signature. Given that
* ArrayListRowsAndColumns is a fundamentally row-oriented data structure, using a row-oriented
* algorithm should prove better than the column-oriented implementation in DefaultClusteredGroupPartitioner
*
* @param columns the columns to partition on as in {@link #computeBoundaries(List)}
* @return the partition boundaries as in {@link #computeBoundaries(List)}
*/
private int[] computeBoundariesAllInSignature(List<String> columns)
{
ArrayList<Comparator<Object>> comparators = new ArrayList<>(columns.size());
ArrayList<Function<RowType, Object>> adapters = new ArrayList<>(columns.size());
for (String column : columns) {
final Optional<ColumnType> columnType = rowSignature.getColumnType(column);
if (columnType.isPresent()) {
comparators.add(Comparator.nullsFirst(columnType.get().getStrategy()));
adapters.add(rowAdapter.columnFunction(column));
} else {
throw new ISE("column didn't exist!? Other method should've been called...");
}
}
IntList boundaries = new IntArrayList();
Object[] prevVal = new Object[comparators.size()];
Object[] nextVal = new Object[comparators.size()];
int numRows = endOffset - startOffset;
boundaries.add(0);
RowType currRow = rows.get(startOffset);
for (int i = 0; i < adapters.size(); ++i) {
prevVal[i] = adapters.get(i).apply(currRow);
}
for (int i = 1; i < numRows; ++i) {
currRow = rows.get(startOffset + i);
for (int j = 0; j < adapters.size(); ++j) {
nextVal[j] = adapters.get(j).apply(currRow);
}
for (int j = 0; j < comparators.size(); ++j) {
final int comparison = comparators.get(j).compare(prevVal[j], nextVal[j]);
if (comparison != 0) {
// Swap references
Object[] tmpRef = prevVal;
prevVal = nextVal;
nextVal = tmpRef;
boundaries.add(i);
break;
}
}
}
boundaries.add(numRows);
return boundaries.toIntArray();
}
/**
* Computes boundaries including some columns that were appended later. In this case, we are fundamentally
* mixing some row-oriented format with some column-oriented format. It's hard to determine if there's really
* an optimized form for this (or, really, it's hard to know if the optimized form would actually be worth
* the code complexity), so just fall back to the DefaultClusteredGroupPartitioner to compute these boundaries,
* the optimizations that come from re-using the ArrayListRowsAndColumns will continue to exist.
*
* @param columns the columns to partition on as in {@link #computeBoundaries(List)}
* @return the partition boundaries as in {@link #computeBoundaries(List)}
*/
private int[] computeBoundariesSomeAppended(List<String> columns)
{
return new DefaultClusteredGroupPartitioner(ArrayListRowsAndColumns.this).computeBoundaries(columns);
}
@Override
public ArrayList<RowsAndColumns> partitionOnBoundaries(List<String> partitionColumns)
{
final int[] boundaries = computeBoundaries(partitionColumns);
if (boundaries.length < 2) {
return new ArrayList<>();
}
ArrayList<RowsAndColumns> retVal = new ArrayList<>(boundaries.length - 1);
for (int i = 1; i < boundaries.length; ++i) {
int start = boundaries[i - 1];
int end = boundaries[i];
retVal.add(limited(start, end));
}
return retVal;
}
}
private class MyNaiveSortMaker implements NaiveSortMaker
{
@Override
public NaiveSorter make(ArrayList<ColumnWithDirection> ordering)
{
return new NaiveSorter()
{
private int currEnd = endOffset;
@SuppressWarnings("unchecked")
@Override
public RowsAndColumns moreData(RowsAndColumns rac)
{
if (currEnd == rows.size()) {
// It's theoretically possible that this object is used in a place where it sees a bunch of parts from
// the same ArrayListRowsAndColumns and then, continues to receive more and more RowsAndColumns objects
// from other ArrayListRowsAndColumns. In that case, we can
// 1. do a localized sort
// 2. continue to build up the other objects
// 3. once all objects are complete, do a merge-sort between them and return that
//
// This is not currently implemented, however, as the code cannot actually ever generate that sequence
// of objects. Additionally, if the code ever did generate that sequence, the proper solution could be
// to implement something else differently (perhaps use a different type of RowsAndColumns entirely).
// As such, we leave this implementation as an exercise for the future when it is better known why the
// code needed to work with this specific series of concrete objects.
throw new ISE("More data came after completing the ArrayList, not supported yet.");
}
if (rac instanceof ArrayListRowsAndColumns) {
ArrayListRowsAndColumns<RowType> arrayRac = (ArrayListRowsAndColumns<RowType>) rac;
if (arrayRac.startOffset != currEnd) {
throw new ISE(
"ArrayRAC instances seen out-of-order!? currEnd[%,d], arrayRac[%,d][%,d]",
currEnd,
arrayRac.startOffset,
arrayRac.endOffset
);
}
currEnd = arrayRac.endOffset;
return null;
} else {
throw new ISE("Expected an ArrayListRowsAndColumns, got[%s], fall back to default?", rac.getClass());
}
}
@Override
public RowsAndColumns complete()
{
if (currEnd != rows.size()) {
throw new ISE("Didn't see all of the rows? currEnd[%,d], rows.size()[%,d]", currEnd, rows.size());
}
final ArrayListRowsAndColumns<RowType> retVal = limited(0, rows.size());
retVal.sort(ordering);
return retVal;
}
};
}
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.NullColumn;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
@ -33,6 +34,13 @@ import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* A RowsAndColumns implementation that effectively concatenates multiple RowsAndColumns objects together.
* <p>
* That is, if it is given 3 RowsAndColumns objects, of sizes 4, 2, and 3 respectively, the resulting
* RowsAndColumns object will have 9 rows where rows 0-to-3 will be from the first object, 4-to-5 from the next and
* 6-to-8 from the last.
*/
public class ConcatRowsAndColumns implements RowsAndColumns
{
private final ArrayList<RowsAndColumns> racBuffer;
@ -101,7 +109,11 @@ public class ConcatRowsAndColumns implements RowsAndColumns
final ColumnType type = firstAccessor.getType();
for (int i = 1; i < racBuffer.size(); ++i) {
RowsAndColumns rac = racBuffer.get(i);
final Column col = rac.findColumn(name);
Column col = rac.findColumn(name);
if (col == null) {
// It doesn't exist, so must be all null!
col = new NullColumn(type, rac.numRows());
}
final ColumnAccessor accessor = col.toAccessor();
if (!type.equals(accessor.getType())) {
throw new ISE("Type mismatch, expected[%s], got[%s] on entry[%,d]", type, accessor.getType(), i);
@ -109,7 +121,11 @@ public class ConcatRowsAndColumns implements RowsAndColumns
accessors.add(accessor);
}
final ConcatedidColumn retVal = new ConcatedidColumn(type, type.getStrategy(), accessors);
final ConcatedidColumn retVal = new ConcatedidColumn(
type,
Comparator.nullsFirst(type.getStrategy()),
accessors
);
columnCache.put(name, retVal);
return retVal;
}
@ -232,6 +248,7 @@ public class ConcatRowsAndColumns implements RowsAndColumns
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{

View File

@ -17,13 +17,11 @@
* under the License.
*/
package org.apache.druid.query.operator;
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.window.value.ShiftedColumnAccessorBase;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.LimitedColumn;
import javax.annotation.Nullable;
import java.util.Collection;
@ -66,44 +64,7 @@ public class LimitedRowsAndColumns implements RowsAndColumns
return null;
}
return new Column()
{
@Override
public ColumnAccessor toAccessor()
{
final ColumnAccessor columnAccessor = column.toAccessor();
return new ShiftedColumnAccessorBase(columnAccessor)
{
@Override
public int numRows()
{
return end - start;
}
@Override
protected int getActualValue(int rowNum)
{
int retVal = start + rowNum;
if (retVal >= end) {
throw new ISE("Index out of bounds[%d] >= [%d], start[%s]", retVal, end, start);
}
return retVal;
}
@Override
protected boolean outsideBounds(int rowNum)
{
return false;
}
};
}
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
};
return new LimitedColumn(column, start, end);
}
@Nullable
@ -112,4 +73,5 @@ public class LimitedRowsAndColumns implements RowsAndColumns
{
return null;
}
}

View File

@ -40,14 +40,15 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns
return fromMap(ImmutableMap.of(name, col, name2, col2));
}
public static MapOfColumnsRowsAndColumns fromMap(Map<String, Column> map)
@SuppressWarnings("unchecked")
public static MapOfColumnsRowsAndColumns fromMap(Map<String, ? extends Column> map)
{
if (map == null || map.isEmpty()) {
throw new ISE("map[%s] cannot be null or empty.", map);
}
final Iterator<Map.Entry<String, Column>> iter = map.entrySet().iterator();
Map.Entry<String, Column> entry = iter.next();
final Iterator<? extends Map.Entry<String, ? extends Column>> iter = map.entrySet().iterator();
Map.Entry<String, ? extends Column> entry = iter.next();
int numCells = entry.getValue().toAccessor().numRows();
if (iter.hasNext()) {
entry = iter.next();
@ -62,7 +63,10 @@ public class MapOfColumnsRowsAndColumns implements RowsAndColumns
}
}
return new MapOfColumnsRowsAndColumns(map, map.values().iterator().next().toAccessor().numRows());
return new MapOfColumnsRowsAndColumns(
(Map<String, Column>) map,
map.values().iterator().next().toAccessor().numRows()
);
}
private final Map<String, Column> mapOfColumns;

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.aggregation.AggregatorFactory;
import java.util.ArrayList;
import java.util.List;
/**
* A semantic interface used to cumulatively aggregate a list of AggregatorFactories across a given set of data
* <p>
* The aggregation specifically happens on-heap and should be used in places where it is known that the data
* set can be worked with entirely on-heap.
* <p>
* Note, as we implement frame-handling for window aggregations, it is expected that this interface will undergo a
* transformation. It might be deleted and replaced with something else, or might just see a change done in place.
* Either way, there is no assumption of enforced compatibility with this interface at this point in time.
*/
public interface OnHeapCumulativeAggregatable
{
/**
* Cumulatively aggregates the data using the {@code List<AggregatorFactory} objects.
*
* @param aggFactories definition of aggregations to be done
* @return a list of objects, one per AggregatorFactory. That is, the length of the return list should be equal to
* the length of the aggFactories list passed as an argument, while the length of the internal {@code Object[]} will
* be equivalent to the number of rows
*/
ArrayList<Object[]> aggregateCumulative(List<AggregatorFactory> aggFactories);
}

View File

@ -30,6 +30,19 @@ import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* This class exists to "decorate" a rows and columns such that it pretends to exist in a new ordering.
* <p>
* The constructor generally takes an int[] of pointers, these pointers are used to re-map the rows from the
* RowsAndColumns object. That is, if the RowsAndColumns has 4 rows and the array {@code new int[]{3, 1, 2, 0}}
* is passed in, then the order of traverals of the rows will be {@code 3 -> 1 -> 2 -> 0}.
* <p>
* This can be useful for sorting potentially immutable data, as the list of pointers can identify the order
* that the rows should be traversed in. It can also be used for clustering like-data together.
* <p>
* While this avoids a copy, in cases where the data will be iterated regularly, it also generates a random-access
* pattern that is not always optimal.
*/
public class RearrangedRowsAndColumns implements RowsAndColumns
{
private final Map<String, Column> columnCache = new LinkedHashMap<>();

View File

@ -21,7 +21,7 @@ package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.OnHeapAggregatable;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@ -39,8 +39,8 @@ import java.util.Collection;
* with a Rows and columns, we rely on semantic interfaces using the {@link RowsAndColumns#as} method instead.
* <p>
* That is, the expectation is that anything that works with a RowsAndColumns will tend to first ask the RowsAndColumns
* object to become some other interface, for example, an {@link OnHeapAggregatable}. If a RowsAndColumns knows how
* to do a good job as the requested interface, it can return its own concrete implementation of the interface and
* object to become some other interface, for example, a {@link FramedOnHeapAggregatable}. If a RowsAndColumns knows
* how to do a good job as the requested interface, it can return its own concrete implementation of the interface and
* run the necessary logic in its own optimized fashion. If the RowsAndColumns instance does not know how to implement
* the semantic interface, it is expected that a default implementation of the interface can be instantiated on top of
* the default column access mechanisms that the RowsAndColumns provides. Such default implementations should be

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.rowsandcols.column;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
* An interface representing a Column of data.
@ -37,7 +38,7 @@ public interface Column
{
/**
* Returns the column as a {@link ColumnAccessor}. Semantically, this would be equivalent to calling
* {@Code Column.as(ColumnAccessor.class)}. However, being able to implement this interface is part of the explicit
* {@code Column.as(ColumnAccessor.class)}. However, being able to implement this interface is part of the explicit
* contract of implementing this interface, so instead of relying on {@link #as} which allows for returning null,
* we define a top-level method that should never return null.
*
@ -58,6 +59,6 @@ public interface Column
* @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had
* through a local implementation of the interface.
*/
@SuppressWarnings("unused")
@Nullable
<T> T as(Class<? extends T> clazz);
}

View File

@ -19,6 +19,9 @@
package org.apache.druid.query.rowsandcols.column;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class ColumnAccessorBasedColumn implements Column
{
private final ColumnAccessor base;
@ -30,12 +33,14 @@ public class ColumnAccessorBasedColumn implements Column
this.base = base;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return base;
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.column;
/**
* A semantic interface for use with {@link Column} objects.
*
* This is used to swap values inside of a column. Note that this interface fundamentally mutates the underlying
* column. If a column cannot support mutation, it should not return return an implementation of this interface.
*/
public interface ColumnValueSwapper
{
/**
* Swaps the values at the two row ids. There is no significant to "right" and "left", it's just easier to name
* the parameters that way.
*
* @param lhs the left-hand-side rowId
* @param rhs the right-hand-side rowId
*/
void swapValues(int lhs, int rhs);
}

View File

@ -19,21 +19,27 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
public class ConstantObjectColumn implements Column
{
private final Object obj;
private final int numCells;
private final int numRows;
private final ColumnType type;
public ConstantObjectColumn(Object obj, int numCells, ColumnType type)
public ConstantObjectColumn(Object obj, int numRows, ColumnType type)
{
this.obj = obj;
this.numCells = numCells;
this.numRows = numRows;
this.type = type;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
@ -48,7 +54,7 @@ public class ConstantObjectColumn implements Column
@Override
public int numRows()
{
return numCells;
return numRows;
}
@Override
@ -95,9 +101,24 @@ public class ConstantObjectColumn implements Column
};
}
@Nullable
@SuppressWarnings("unchecked")
@Override
public <T> T as(Class<? extends T> clazz)
{
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - numRows < intoStart) {
throw new ISE("too many rows!!! intoStart[%,d], numRows[%,d] combine to exceed max_int", intoStart, numRows);
}
Arrays.fill(into, intoStart, intoStart + numRows, obj);
};
}
if (ColumnValueSwapper.class.equals(clazz)) {
return (T) (ColumnValueSwapper) (lhs, rhs) -> {
};
}
return null;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
public class DefaultVectorCopier implements VectorCopier
{
ColumnAccessor accessor;
public DefaultVectorCopier(ColumnAccessor accessor)
{
this.accessor = accessor;
}
@Override
public void copyInto(Object[] into, int intoStart)
{
final int numRows = accessor.numRows();
if (Integer.MAX_VALUE - numRows < intoStart) {
throw new ISE("too many rows!!! intoStart[%,d], numRows[%,d] combine to exceed max_int", intoStart, numRows);
}
for (int i = 0; i < numRows; ++i) {
into[intoStart + i] = accessor.getObject(i);
}
}
}

View File

@ -19,8 +19,12 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class DoubleArrayColumn implements Column
{
private final double[] vals;
@ -32,6 +36,7 @@ public class DoubleArrayColumn implements Column
this.vals = vals;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
@ -93,9 +98,32 @@ public class DoubleArrayColumn implements Column
};
}
@Nullable
@SuppressWarnings("unchecked")
@Override
public <T> T as(Class<? extends T> clazz)
{
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - vals.length < intoStart) {
throw new ISE(
"too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
intoStart,
vals.length
);
}
for (int i = 0; i < vals.length; ++i) {
into[intoStart + i] = vals[i];
}
};
}
if (ColumnValueSwapper.class.equals(clazz)) {
return (T) (ColumnValueSwapper) (lhs, rhs) -> {
double tmp = vals[lhs];
vals[lhs] = vals[rhs];
vals[rhs] = tmp;
};
}
return null;
}
}

View File

@ -19,8 +19,12 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class IntArrayColumn implements Column
{
private final int[] vals;
@ -32,6 +36,7 @@ public class IntArrayColumn implements Column
this.vals = vals;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
@ -93,9 +98,32 @@ public class IntArrayColumn implements Column
};
}
@Nullable
@SuppressWarnings("unchecked")
@Override
public <T> T as(Class<? extends T> clazz)
{
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> {
if (Integer.MAX_VALUE - vals.length < intoStart) {
throw new ISE(
"too many rows!!! intoStart[%,d], vals.length[%,d] combine to exceed max_int",
intoStart,
vals.length
);
}
for (int i = 0; i < vals.length; ++i) {
into[intoStart + i] = vals[i];
}
};
}
if (ColumnValueSwapper.class.equals(clazz)) {
return (T) (ColumnValueSwapper) (lhs, rhs) -> {
int tmp = vals[lhs];
vals[lhs] = vals[rhs];
vals[rhs] = tmp;
};
}
return null;
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.operator.window.value.ShiftedColumnAccessorBase;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class LimitedColumn implements Column
{
private final Column column;
private final int start;
private final int end;
public LimitedColumn(Column column, int start, int end)
{
this.column = column;
this.start = start;
this.end = end;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
final ColumnAccessor columnAccessor = column.toAccessor();
return new ShiftedColumnAccessorBase(columnAccessor)
{
@Override
public int numRows()
{
return end - start;
}
@Override
protected int getActualValue(int rowNum)
{
int retVal = start + rowNum;
if (retVal >= end) {
throw new ISE("Index out of bounds[%d] >= [%d], start[%s]", retVal, end, start);
}
return retVal;
}
@Override
protected boolean outsideBounds(int rowNum)
{
return false;
}
};
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public class NullColumn implements Column
{
private final ColumnType type;
private final int numRows;
public NullColumn(
ColumnType type,
int numRows
)
{
this.type = type;
this.numRows = numRows;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
return new Accessor(type, numRows);
}
@Nullable
@Override
public <T> T as(Class<? extends T> clazz)
{
return null;
}
public static class Accessor implements ColumnAccessor
{
private final ColumnType type;
private final int size;
public Accessor(ColumnType type, int size)
{
this.type = type;
this.size = size;
}
@Override
public ColumnType getType()
{
return type;
}
@Override
public int numRows()
{
return size;
}
@Override
public boolean isNull(int rowNum)
{
return true;
}
@Nullable
@Override
public Object getObject(int rowNum)
{
return null;
}
@Override
public double getDouble(int rowNum)
{
return 0;
}
@Override
public float getFloat(int rowNum)
{
return 0;
}
@Override
public long getLong(int rowNum)
{
return 0;
}
@Override
public int getInt(int rowNum)
{
return 0;
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return 0;
}
}
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
public class NullColumnAccessor implements ColumnAccessor
{
private final ColumnType type;
private final int size;
public NullColumnAccessor(int size)
{
this(ColumnType.UNKNOWN_COMPLEX, size);
}
public NullColumnAccessor(ColumnType type, int size)
{
this.type = type;
this.size = size;
}
@Override
public ColumnType getType()
{
return type;
}
@Override
public int numRows()
{
return size;
}
@Override
public boolean isNull(int rowNum)
{
return true;
}
@Nullable
@Override
public Object getObject(int rowNum)
{
return null;
}
@Override
public double getDouble(int rowNum)
{
return 0;
}
@Override
public float getFloat(int rowNum)
{
return 0;
}
@Override
public long getLong(int rowNum)
{
return 0;
}
@Override
public int getInt(int rowNum)
{
return 0;
}
@Override
public int compareRows(int lhsRowNum, int rhsRowNum)
{
return 0;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Comparator;
public class ObjectArrayColumn implements Column
@ -41,6 +43,19 @@ public class ObjectArrayColumn implements Column
this.comparator = comparator;
}
/**
* Gets the underlying object array. This method is exposed on the concrete class explicitly to allow for
* mutation. This class does absolutely nothing to ensure that said mutation of the array is valid. It is up
* to the caller that is choosing to do this mutation to make sure that it is safe.
*
* @return the object array backing this column
*/
public Object[] getObjects()
{
return objects;
}
@Nonnull
@Override
public ColumnAccessor toAccessor()
{
@ -72,9 +87,21 @@ public class ObjectArrayColumn implements Column
};
}
@Nullable
@SuppressWarnings("unchecked")
@Override
public <T> T as(Class<? extends T> clazz)
{
if (VectorCopier.class.equals(clazz)) {
return (T) (VectorCopier) (into, intoStart) -> System.arraycopy(objects, 0, into, intoStart, objects.length);
}
if (ColumnValueSwapper.class.equals(clazz)) {
return (T) (ColumnValueSwapper) (lhs, rhs) -> {
Object tmp = objects[lhs];
objects[lhs] = objects[rhs];
objects[rhs] = tmp;
};
}
return null;
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.column;
/**
* A semantic interface for use with {@link Column} objects. Has methods to request that the {@link Column} copy
* values into provided array structures. Note that this interface is primarily useful to <em>read</em> from
* {@link Column} objects and not to write to them.
*/
public interface VectorCopier
{
/**
* Copies all values from the underlying column <em>into</em> the {@code Object[]} passed into this method.
*
* @param into the object array to store the values
* @param intoStart the index of the into array to start writing into.
*/
void copyInto(Object[] into, int intoStart);
}

View File

@ -32,6 +32,15 @@ import java.util.List;
*/
public interface ClusteredGroupPartitioner
{
static ClusteredGroupPartitioner fromRAC(RowsAndColumns rac)
{
ClusteredGroupPartitioner retVal = rac.as(ClusteredGroupPartitioner.class);
if (retVal == null) {
retVal = new DefaultClusteredGroupPartitioner(rac);
}
return retVal;
}
/**
* Computes and returns a list of contiguous boundaries for independent groups. All rows in a specific grouping
* should have the same values for the identified columns. Additionally, as this is assuming it is dealing with

View File

@ -21,7 +21,7 @@ package org.apache.druid.query.rowsandcols.semantic;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.druid.query.operator.LimitedRowsAndColumns;
import org.apache.druid.query.rowsandcols.LimitedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
@ -43,6 +43,10 @@ public class DefaultClusteredGroupPartitioner implements ClusteredGroupPartition
@Override
public int[] computeBoundaries(List<String> columns)
{
if (rac.numRows() == 0) {
return new int[]{};
}
// Initialize to a grouping of everything
IntList boundaries = new IntArrayList(new int[]{0, rac.numRows()});
@ -78,6 +82,10 @@ public class DefaultClusteredGroupPartitioner implements ClusteredGroupPartition
public ArrayList<RowsAndColumns> partitionOnBoundaries(List<String> partitionColumns)
{
final int[] boundaries = computeBoundaries(partitionColumns);
if (boundaries.length < 2) {
return new ArrayList<>();
}
ArrayList<RowsAndColumns> retVal = new ArrayList<>(boundaries.length - 1);
for (int i = 1; i < boundaries.length; ++i) {

View File

@ -35,6 +35,7 @@ import org.apache.druid.segment.ObjectColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
@ -50,6 +51,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
this.rac = rac;
}
@Nonnull
@Override
public RowsAndColumns aggregateAll(
WindowFrame frame,
@ -299,12 +301,10 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
int upperOffset
)
{
/**
* There are 3 different phases of operation when we have more rows than our window size
* 1. Our window is not full, as we walk the rows we build up towards filling it
* 2. Our window is full, as we walk the rows we take a value off and add a new aggregation
* 3. We are nearing the end of the rows, we need to start shrinking the window aperture
*/
// There are 3 different phases of operation when we have more rows than our window size
// 1. Our window is not full, as we walk the rows we build up towards filling it
// 2. Our window is full, as we walk the rows we take a value off and add a new aggregation
// 3. We are nearing the end of the rows, we need to start shrinking the window aperture
int numRows = rac.numRows();
int windowSize = lowerOffset + upperOffset + 1;
@ -434,15 +434,16 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
int upperOffset
)
{
int numRows = rac.numRows();
int windowSize = numRows;
// In this case, we need to store a value for all items, so our windowSize is equivalent to the number of rows
// from the RowsAndColumns object that we are using.
int windowSize = rac.numRows();
// We store the results in an Object array for convenience. This is definitely sub-par from a memory management
// point of view as we should use native arrays when possible. This will be fine for now, but it probably makes
// sense to look at optimizing this in the future. That said, such an optimization might best come by having
// a specialized implementation of this interface against, say, a Frame object that can deal with arrays instead
// of trying to optimize this generic implementation.
Object[][] results = new Object[aggFactories.length][numRows];
Object[][] results = new Object[aggFactories.length][windowSize];
int resultStorageIndex = 0;
AtomicInteger rowIdProvider = new AtomicInteger(0);
@ -458,11 +459,11 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
// This is the index to stop at for the current window aperture
// The first row is used by all of the results for the lowerOffset num results, plus 1 for the "current row"
int stopIndex = Math.min(lowerOffset + 1, numRows);
int stopIndex = Math.min(lowerOffset + 1, windowSize);
int startIndex = 0;
int rowId = rowIdProvider.get();
while (rowId < numRows) {
while (rowId < windowSize) {
for (Aggregator[] aggregator : aggregators) {
for (int j = startIndex; j < stopIndex; ++j) {
aggregator[j].aggregate();
@ -540,13 +541,16 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
}
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
@Nonnull
public DimensionSelector makeDimensionSelector(@Nonnull DimensionSpec dimensionSpec)
{
throw new UOE("combining factory shouldn't need dimensions, just columnValue, dim[%s]", dimensionSpec);
}
@SuppressWarnings("rawtypes")
@Override
public ColumnValueSelector makeColumnValueSelector(String columnName)
@Nonnull
public ColumnValueSelector makeColumnValueSelector(@Nonnull String columnName)
{
return new ObjectColumnSelector()
{
@ -564,6 +568,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
}
@Override
@Nonnull
public Class classOfObject()
{
return results[index].getClass();
@ -573,7 +578,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String column)
public ColumnCapabilities getColumnCapabilities(@Nonnull String column)
{
return columnCapabilities;
}

View File

@ -1,108 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.rowsandcols.OnHeapCumulativeAggregatable;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class DefaultOnHeapAggregatable implements OnHeapAggregatable, OnHeapCumulativeAggregatable
{
private final RowsAndColumns rac;
public DefaultOnHeapAggregatable(
RowsAndColumns rac
)
{
this.rac = rac;
}
@Override
public ArrayList<Object> aggregateAll(
List<AggregatorFactory> aggFactories
)
{
Aggregator[] aggs = new Aggregator[aggFactories.size()];
AtomicInteger currRow = new AtomicInteger(0);
int index = 0;
for (AggregatorFactory aggFactory : aggFactories) {
aggs[index++] = aggFactory.factorize(new DefaultColumnSelectorFactoryMaker.ColumnAccessorBasedColumnSelectorFactory(
currRow,
rac
));
}
int numRows = rac.numRows();
int rowId = currRow.get();
while (rowId < numRows) {
for (Aggregator agg : aggs) {
agg.aggregate();
}
rowId = currRow.incrementAndGet();
}
ArrayList<Object> retVal = new ArrayList<>(aggs.length);
for (Aggregator agg : aggs) {
retVal.add(agg.get());
agg.close();
}
return retVal;
}
@Override
public ArrayList<Object[]> aggregateCumulative(List<AggregatorFactory> aggFactories)
{
Aggregator[] aggs = new Aggregator[aggFactories.size()];
ArrayList<Object[]> retVal = new ArrayList<>(aggFactories.size());
int numRows = rac.numRows();
AtomicInteger currRow = new AtomicInteger(0);
int index = 0;
for (AggregatorFactory aggFactory : aggFactories) {
aggs[index++] = aggFactory.factorize(new DefaultColumnSelectorFactoryMaker.ColumnAccessorBasedColumnSelectorFactory(
currRow,
rac
));
retVal.add(new Object[numRows]);
}
int rowId = currRow.get();
while (rowId < numRows) {
for (int i = 0; i < aggs.length; ++i) {
aggs[i].aggregate();
retVal.get(i)[rowId] = aggs[i].get();
}
rowId = currRow.incrementAndGet();
}
for (Aggregator agg : aggs) {
agg.close();
}
return retVal;
}
}

View File

@ -23,6 +23,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import javax.annotation.Nonnull;
/**
* A semantic interface used to aggregate a list of AggregatorFactories across a given set of data
* <p>
@ -53,5 +55,6 @@ public interface FramedOnHeapAggregatable
* @return a RowsAndColumns that contains columns representing the results of the aggregation
* from the AggregatorFactories
*/
@Nonnull
RowsAndColumns aggregateAll(WindowFrame frame, AggregatorFactory[] aggFactories);
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import javax.annotation.Nullable;
import java.util.ArrayList;
/**
@ -53,6 +54,7 @@ public interface NaiveSortMaker
* @param rac the data to include in the sort
* @return optionally, a RowsAndColumns object of data that is known to be in sorted order, null if nothing yet.
*/
@Nullable
RowsAndColumns moreData(RowsAndColumns rac);
/**
@ -60,15 +62,12 @@ public interface NaiveSortMaker
*
* @return A RowsAndColumns object of sorted data that has not been returned already from {@link #moreData} calls.
*/
@Nullable
RowsAndColumns complete();
}
/**
* Makes the NaiveSorter that will actually do the sort. This method uses {@code List<OrderByColumnSpec>} to avoid
* littering the code with extra objects for the same thing. {@code OrderByColumnSpec} is only used to identify
* which column should be sorted and in which direction. Specifically, it has a "dimensionComparator" field which
* seems to indicate that it's possible to provide a specific comparator ordering, this should be completely ignored
* by implementations of the NaiveSorter interface.
* Makes the NaiveSorter that will actually do the sort.
*
* @param ordering a specification of which columns to sort in which direction
* @return a NaiveSorter that will sort according to the provided spec

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import org.apache.druid.query.aggregation.AggregatorFactory;
import java.util.ArrayList;
import java.util.List;
/**
* A semantic interface used to aggregate a list of AggregatorFactories across a given set of data
* <p>
* The aggregation specifically happens on-heap and should be used in places where it is known that the data
* set can be worked with entirely on-heap.
* <p>
* Note, as we implement frame-handling for window aggregations, it is expected that this interface will undergo a
* transformation. It might be deleted and replaced with something else, or might just see a change done in place.
* Either way, there is no assumption of enforced compatibility with this interface at this point in time.
*/
public interface OnHeapAggregatable
{
/**
* Aggregates the data using the {@code List<AggregatorFactory} objects.
*
* @param aggFactories definition of aggregations to be done
* @return a list of objects, one per AggregatorFactory. That is, the length of the return list should be equal to
* the length of the aggFactories list passed as an argument
*/
ArrayList<Object> aggregateAll(List<AggregatorFactory> aggFactories);
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.operator.window;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
@ -100,15 +101,23 @@ public class RowsAndColumnsHelper
public RowsAndColumnsHelper expectColumn(String col, ColumnType type, Object... expectedVals)
{
final ColumnHelper helper = columnHelper(col, expectedVals.length, type);
helper.setExpectation(expectedVals);
return this;
return expectColumn(col, expectedVals, type);
}
public RowsAndColumnsHelper expectColumn(String col, Object[] expectedVals, ColumnType type)
{
IntArrayList nullPositions = new IntArrayList();
for (int i = 0; i < expectedVals.length; i++) {
if (expectedVals[i] == null) {
nullPositions.add(i);
}
}
final ColumnHelper helper = columnHelper(col, expectedVals.length, type);
helper.setExpectation(expectedVals);
if (!nullPositions.isEmpty()) {
helper.setNulls(nullPositions.toIntArray());
}
return this;
}
@ -159,7 +168,9 @@ public class RowsAndColumnsHelper
}
for (Map.Entry<String, ColumnHelper> entry : helpers.entrySet()) {
entry.getValue().validate(StringUtils.format("%s.%s", name, entry.getKey()), rac.findColumn(entry.getKey()));
final Column racColumn = rac.findColumn(entry.getKey());
Assert.assertNotNull(racColumn);
entry.getValue().validate(StringUtils.format("%s.%s", name, entry.getKey()), racColumn);
}
}

View File

@ -1,115 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.operator.window;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.operator.window.ranking.WindowRowNumberProcessor;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
public class WindowAggregateProcessorTest
{
static {
NullHandling.initializeForTests();
}
@Test
public void testAggregation()
{
Map<String, Column> map = new LinkedHashMap<>();
map.put("intCol", new IntArrayColumn(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
map.put("doubleCol", new DoubleArrayColumn(new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
map.put("objectCol", new ObjectArrayColumn(
new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"},
ColumnType.STRING
)
);
MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
WindowAggregateProcessor processor = new WindowAggregateProcessor(
Arrays.asList(
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new LongSumAggregatorFactory("sumFromDouble", "doubleCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new DoubleMaxAggregatorFactory("maxFromDouble", "doubleCol")
),
Arrays.asList(
new LongMaxAggregatorFactory("cummMax", "intCol"),
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
)
);
RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
.expectColumn("intCol", new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("doubleCol", new double[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("sumFromLong", new long[]{45, 45, 45, 45, 45, 45, 45, 45, 45, 45})
.expectColumn("sumFromDouble", new long[]{45, 45, 45, 45, 45, 45, 45, 45, 45, 45})
.expectColumn("maxFromInt", new double[]{9, 9, 9, 9, 9, 9, 9, 9, 9, 9})
.expectColumn("maxFromDouble", new double[]{9, 9, 9, 9, 9, 9, 9, 9, 9, 9})
.expectColumn("cummMax", new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
.expectColumn("cummSum", new double[]{0, 1, 3, 6, 10, 15, 21, 28, 36, 45});
final RowsAndColumns results = processor.process(rac);
expectations.validate(results);
}
@Test
public void testValidateEquality()
{
WindowAggregateProcessor processor = new WindowAggregateProcessor(
Arrays.asList(
new LongSumAggregatorFactory("sumFromLong", "intCol"),
new LongSumAggregatorFactory("sumFromDouble", "doubleCol"),
new DoubleMaxAggregatorFactory("maxFromInt", "intCol"),
new DoubleMaxAggregatorFactory("maxFromDouble", "doubleCol")
),
Arrays.asList(
new LongMaxAggregatorFactory("cummMax", "intCol"),
new DoubleSumAggregatorFactory("cummSum", "doubleCol")
)
);
Assert.assertTrue(processor.validateEquivalent(processor));
Assert.assertFalse(processor.validateEquivalent(new WindowRowNumberProcessor("bob")));
Assert.assertFalse(processor.validateEquivalent(new WindowAggregateProcessor(processor.getAggregations(), null)));
Assert.assertFalse(processor.validateEquivalent(
new WindowAggregateProcessor(new ArrayList<>(), processor.getCumulativeAggregations())
));
Assert.assertFalse(processor.validateEquivalent(new WindowAggregateProcessor(new ArrayList<>(), null)));
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.query.operator.window;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
@ -36,6 +37,10 @@ import org.junit.Test;
@SuppressWarnings("unchecked")
public class WindowFramedAggregateProcessorTest
{
static {
NullHandling.initializeForTests();
}
@Test
public void testIsPassThruWhenRACReturnsSemanticInterface()
{

View File

@ -19,8 +19,6 @@
package org.apache.druid.query.rowsandcols;
import org.apache.druid.query.operator.LimitedRowsAndColumns;
import java.util.ArrayList;
import java.util.function.Function;

View File

@ -19,16 +19,18 @@
package org.apache.druid.query.rowsandcols.column;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
public class NullColumnAccessorTest
public class NullColumnTest
{
@Test
public void testSanity()
{
NullColumnAccessor accessor = new NullColumnAccessor(10);
NullColumn col = new NullColumn(ColumnType.UNKNOWN_COMPLEX, 10);
ColumnAccessor accessor = col.toAccessor();
Assert.assertEquals(10, accessor.numRows());
for (int i = 0; i < 10; ++i) {

View File

@ -24,8 +24,12 @@ import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.function.Function;
public class AppendableRowsAndColumnsTest extends SemanticTestBase
@ -60,4 +64,32 @@ public class AppendableRowsAndColumnsTest extends SemanticTestBase
.validate(appender);
}
@Test
public void testAppendableRowsAndColumnsCanBeUsedForClusterGrouper()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
));
AppendableRowsAndColumns appender = RowsAndColumns.expectAppendable(rac);
appender.addColumn(
"sorted",
new ObjectArrayColumn(new Object[]{null, null, null, 1, 1, 2, 4, 4, 4}, ColumnType.LONG)
);
new RowsAndColumnsHelper()
.expectColumn("unsorted", new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
.expectColumn("sorted", new Object[]{null, null, null, 1, 1, 2, 4, 4, 4}, ColumnType.LONG)
.allColumnsRegistered()
.validate(appender);
final ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(appender);
final int[] boundaries = parter.computeBoundaries(Collections.singletonList("sorted"));
Assert.assertArrayEquals(new int[]{0, 3, 5, 6, 9}, boundaries);
}
}

View File

@ -24,6 +24,8 @@ import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
@ -44,6 +46,18 @@ public class ClusteredGroupPartitionerTest extends SemanticTestBase
super(name, fn);
}
@Test
public void testEmpty()
{
RowsAndColumns rac = make(new MapOfColumnsRowsAndColumns(ImmutableMap.of(), 0));
final ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(rac);
final List<String> cols = Collections.singletonList("notThere");
Assert.assertArrayEquals(new int[]{}, parter.computeBoundaries(cols));
Assert.assertTrue(parter.partitionOnBoundaries(cols).isEmpty());
}
@Test
public void testDefaultClusteredGroupPartitioner()
{
@ -54,10 +68,7 @@ public class ClusteredGroupPartitionerTest extends SemanticTestBase
)
));
ClusteredGroupPartitioner parter = rac.as(ClusteredGroupPartitioner.class);
if (parter == null) {
parter = new DefaultClusteredGroupPartitioner(rac);
}
ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(rac);
int[] expectedBounds = new int[]{0, 3, 5, 6, 9};
@ -117,4 +128,48 @@ public class ClusteredGroupPartitionerTest extends SemanticTestBase
}
Assert.assertFalse(unsortedChunks.hasNext());
}
@Test
public void testDefaultClusteredGroupPartitionerWithNulls()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new ObjectArrayColumn(new Object[]{null, null, null, 1, 1, 2, 4, 4, 4}, ColumnType.LONG),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
));
ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(rac);
int[] expectedBounds = new int[]{0, 3, 5, 6, 9};
List<RowsAndColumnsHelper> expectations = Arrays.asList(
new RowsAndColumnsHelper()
.expectColumn("sorted", new Object[]{null, null, null}, ColumnType.LONG)
.expectColumn("unsorted", new int[]{3, 54, 21})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 1})
.expectColumn("unsorted", new int[]{1, 5})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{2})
.expectColumn("unsorted", new int[]{54})
.allColumnsRegistered(),
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{4, 4, 4})
.expectColumn("unsorted", new int[]{2, 3, 92})
.allColumnsRegistered()
);
final List<String> partCols = Collections.singletonList("sorted");
Assert.assertArrayEquals(expectedBounds, parter.computeBoundaries(partCols));
final Iterator<RowsAndColumns> partedChunks = parter.partitionOnBoundaries(partCols).iterator();
for (RowsAndColumnsHelper expectation : expectations) {
Assert.assertTrue(partedChunks.hasNext());
expectation.validate(partedChunks.next());
}
Assert.assertFalse(partedChunks.hasNext());
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.operator.ColumnWithDirection;
import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
import org.apache.druid.segment.column.ColumnType;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;
/**
* Place where tests can live that are testing the interactions of multiple semantic interfaces
*/
@SuppressWarnings("ConstantConditions")
public class CombinedSemanticInterfacesTest extends SemanticTestBase
{
public CombinedSemanticInterfacesTest(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
{
super(name, fn);
}
/**
* Tests a relatively common series of operations for window functions: partition -> aggregate -> sort
*/
@Test
public void testPartitionAggregateAndSortTest()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
));
ClusteredGroupPartitioner parter = rac.as(ClusteredGroupPartitioner.class);
if (parter == null) {
parter = new DefaultClusteredGroupPartitioner(rac);
}
final ArrayList<RowsAndColumns> partitioned = parter.partitionOnBoundaries(Collections.singletonList("sorted"));
Assert.assertEquals(4, partitioned.size());
NaiveSortMaker.NaiveSorter sorter = null;
for (RowsAndColumns rowsAndColumns : partitioned) {
final FramedOnHeapAggregatable aggregatable = FramedOnHeapAggregatable.fromRAC(rowsAndColumns);
final RowsAndColumns aggedRAC = aggregatable.aggregateAll(
WindowFrame.unbounded(),
new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "unsorted")}
);
if (sorter == null) {
sorter = NaiveSortMaker.fromRAC(aggedRAC).make(ColumnWithDirection.ascending("unsorted"));
} else {
Assert.assertNull(sorter.moreData(aggedRAC));
}
}
Assert.assertNotNull(sorter);
final RowsAndColumns completed = sorter.complete();
Assert.assertNotNull(completed);
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{1, 4, 0, 4, 1, 0, 0, 2, 4})
.expectColumn("unsorted", new int[]{1, 2, 3, 3, 5, 21, 54, 54, 92})
.expectColumn("sum", new long[]{6, 97, 78, 97, 6, 78, 78, 54, 97})
.allColumnsRegistered()
.validate(completed);
}
/**
* Tests a relatively common series of operations for window functions: partition -> aggregate
*/
@Test
public void testPartitionThenAggregateNoSortTest()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
"unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
)
));
ClusteredGroupPartitioner parter = ClusteredGroupPartitioner.fromRAC(rac);
final ArrayList<RowsAndColumns> partitioned = parter.partitionOnBoundaries(Collections.singletonList("sorted"));
Assert.assertEquals(4, partitioned.size());
Map<String, ObjectArrayColumn> outputCols = new LinkedHashMap<>();
outputCols.put("sorted", new ObjectArrayColumn(new Object[9], ColumnType.LONG));
outputCols.put("unsorted", new ObjectArrayColumn(new Object[9], ColumnType.LONG));
outputCols.put("sum", new ObjectArrayColumn(new Object[9], ColumnType.LONG));
int rowCounter = 0;
for (RowsAndColumns rowsAndColumns : partitioned) {
final FramedOnHeapAggregatable aggregatable = FramedOnHeapAggregatable.fromRAC(rowsAndColumns);
final RowsAndColumns aggedRAC = aggregatable.aggregateAll(
WindowFrame.unbounded(),
new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "unsorted")}
);
for (Map.Entry<String, ObjectArrayColumn> entry : outputCols.entrySet()) {
Object[] objArray = entry.getValue().getObjects();
final ColumnAccessor columnAccessor = aggedRAC.findColumn(entry.getKey()).toAccessor();
for (int i = 0; i < columnAccessor.numRows(); ++i) {
objArray[rowCounter + i] = columnAccessor.getObject(i);
}
}
rowCounter += aggedRAC.numRows();
}
RowsAndColumns completed = MapOfColumnsRowsAndColumns.fromMap(outputCols);
new RowsAndColumnsHelper()
.expectColumn("sorted", new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4})
.expectColumn("unsorted", new int[]{3, 54, 21, 1, 5, 54, 2, 3, 92})
.expectColumn("sum", new long[]{78, 78, 78, 6, 6, 54, 97, 97, 97})
.allColumnsRegistered()
.validate(completed);
}
}

View File

@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.rowsandcols.semantic;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.function.Function;
public class OnHeapAggregatableTest extends SemanticTestBase
{
public OnHeapAggregatableTest(
String name,
Function<MapOfColumnsRowsAndColumns, RowsAndColumns> fn
)
{
super(name, fn);
}
@Test
public void testOnHeapAggregatable()
{
RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap(
ImmutableMap.of(
"incremented", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
"zeroesOut", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0})
)
));
OnHeapAggregatable agger = rac.as(OnHeapAggregatable.class);
if (agger == null) {
agger = new DefaultOnHeapAggregatable(rac);
}
final ArrayList<Object> results = agger.aggregateAll(Arrays.asList(
new LongSumAggregatorFactory("incremented", "incremented"),
new LongMaxAggregatorFactory("zeroesOutMax", "zeroesOut"),
new LongMinAggregatorFactory("zeroesOutMin", "zeroesOut")
));
Assert.assertEquals(3, results.size());
Assert.assertEquals(55L, results.get(0));
Assert.assertEquals(82L, results.get(1));
Assert.assertEquals(-90L, results.get(2));
}
}