Reuse IndexedInts returned from DimensionSelector.getRow() implementations (#5172)

* Reuse IndexedInts in DimensionSelector implementations

* Remove BaseObjectColumnValueSelector.getObject() doc

* typo
This commit is contained in:
Roman Leventov 2018-01-17 16:01:26 +01:00 committed by GitHub
parent 241efafbb2
commit ad6cdf5d09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 194 additions and 116 deletions

View File

@ -42,6 +42,7 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id
private final IdLookup baseIdLookup; private final IdLookup baseIdLookup;
private final Int2IntOpenHashMap forwardMapping; private final Int2IntOpenHashMap forwardMapping;
private final int[] reverseMapping; private final int[] reverseMapping;
private final ArrayBasedIndexedInts row = new ArrayBasedIndexedInts();
/** /**
* @param selector must return true from {@link DimensionSelector#nameLookupPossibleInAdvance()} * @param selector must return true from {@link DimensionSelector#nameLookupPossibleInAdvance()}
@ -70,15 +71,17 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id
{ {
IndexedInts baseRow = selector.getRow(); IndexedInts baseRow = selector.getRow();
int baseRowSize = baseRow.size(); int baseRowSize = baseRow.size();
int[] result = new int[baseRowSize]; row.ensureSize(baseRowSize);
int resultSize = 0; int resultSize = 0;
for (int i = 0; i < baseRowSize; i++) { for (int i = 0; i < baseRowSize; i++) {
int forwardedValue = forwardMapping.get(baseRow.get(i)); int forwardedValue = forwardMapping.get(baseRow.get(i));
if (forwardedValue >= 0) { if (forwardedValue >= 0) {
result[resultSize++] = forwardedValue; row.setValue(resultSize, forwardedValue);
resultSize++;
} }
} }
return ArrayBasedIndexedInts.of(result, resultSize); row.setSize(resultSize);
return row;
} }
@Override @Override

View File

@ -34,6 +34,7 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector
{ {
private final DimensionSelector selector; private final DimensionSelector selector;
private final Predicate<String> predicate; private final Predicate<String> predicate;
private final ArrayBasedIndexedInts row = new ArrayBasedIndexedInts();
PredicateFilteredDimensionSelector(DimensionSelector selector, Predicate<String> predicate) PredicateFilteredDimensionSelector(DimensionSelector selector, Predicate<String> predicate)
{ {
@ -46,14 +47,16 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector
{ {
IndexedInts baseRow = selector.getRow(); IndexedInts baseRow = selector.getRow();
int baseRowSize = baseRow.size(); int baseRowSize = baseRow.size();
int[] result = new int[baseRowSize]; row.ensureSize(baseRowSize);
int resultSize = 0; int resultSize = 0;
for (int i = 0; i < baseRowSize; i++) { for (int i = 0; i < baseRowSize; i++) {
if (predicate.apply(selector.lookupName(baseRow.get(i)))) { if (predicate.apply(selector.lookupName(baseRow.get(i)))) {
result[resultSize++] = i; row.setValue(resultSize, i);
resultSize++;
} }
} }
return ArrayBasedIndexedInts.of(result, resultSize); row.setSize(resultSize);
return row;
} }
@Override @Override

View File

@ -203,11 +203,14 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
} else { } else {
return new DimensionSelector() return new DimensionSelector()
{ {
private final RangeIndexedInts indexedInts = new RangeIndexedInts();
@Override @Override
public IndexedInts getRow() public IndexedInts getRow()
{ {
final List<String> dimensionValues = row.get().getDimension(dimension); final List<String> dimensionValues = row.get().getDimension(dimension);
return RangeIndexedInts.create(dimensionValues != null ? dimensionValues.size() : 0); indexedInts.setSize(dimensionValues != null ? dimensionValues.size() : 0);
return indexedInts;
} }
@Override @Override

View File

@ -68,21 +68,26 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin
{ {
final DimensionSelector dimSelector = (DimensionSelector) selector; final DimensionSelector dimSelector = (DimensionSelector) selector;
final IndexedInts row = dimSelector.getRow(); final IndexedInts row = dimSelector.getRow();
final int[] newIds = new int[row.size()]; ArrayBasedIndexedInts newRow = (ArrayBasedIndexedInts) valuess[columnIndex];
if (newRow == null) {
newRow = new ArrayBasedIndexedInts();
valuess[columnIndex] = newRow;
}
int rowSize = row.size();
newRow.ensureSize(rowSize);
for (int i = 0; i < row.size(); i++) { for (int i = 0; i < row.size(); i++) {
final String value = dimSelector.lookupName(row.get(i)); final String value = dimSelector.lookupName(row.get(i));
final int dictId = reverseDictionary.getInt(value); final int dictId = reverseDictionary.getInt(value);
if (dictId < 0) { if (dictId < 0) {
dictionary.add(value); dictionary.add(value);
reverseDictionary.put(value, nextId); reverseDictionary.put(value, nextId);
newIds[i] = nextId; newRow.setValue(i, nextId);
nextId++; nextId++;
} else { } else {
newIds[i] = dictId; newRow.setValue(i, dictId);
} }
} }
valuess[columnIndex] = ArrayBasedIndexedInts.of(newIds); newRow.setSize(rowSize);
} }
@Override @Override

View File

@ -22,7 +22,8 @@ package io.druid.segment;
import io.druid.guice.annotations.PublicApi; import io.druid.guice.annotations.PublicApi;
/** /**
* Base type for interfaces that manage column value selection, e.g. DimensionSelector, LongColumnSelector * Base type for interfaces that manage column value selection, e.g. {@link DimensionSelector}, {@link
* LongColumnSelector}.
* *
* This interface has methods to get the value in all primitive types, that have corresponding basic aggregators in * This interface has methods to get the value in all primitive types, that have corresponding basic aggregators in
* Druid: Sum, Min, Max, etc: {@link #getFloat()}, {@link #getDouble()} and {@link #getLong()} to support "polymorphic" * Druid: Sum, Min, Max, etc: {@link #getFloat()}, {@link #getDouble()} and {@link #getLong()} to support "polymorphic"

View File

@ -36,11 +36,15 @@ public interface DimensionSelector extends ColumnValueSelector, HotLoopCallee
int CARDINALITY_UNKNOWN = -1; int CARDINALITY_UNKNOWN = -1;
/** /**
* Gets all values for the row inside of an IntBuffer. I.e. one possible implementation could be * Returns the indexed values at the current position in this DimensionSelector.
* *
* return IntBuffer.wrap(lookupExpansion(get()); * IMPORTANT. The returned {@link IndexedInts} object could generally be reused inside the implementation of
* * DimensionSelector, i. e. this method could always return the same object for the same selector. Users
* @return all values for the row as an IntBuffer * of this API, such as {@link io.druid.query.aggregation.Aggregator#aggregate()}, {@link
* io.druid.query.aggregation.BufferAggregator#aggregate}, {@link io.druid.query.aggregation.AggregateCombiner#reset},
* {@link io.druid.query.aggregation.AggregateCombiner#fold} should be prepared for that and not storing the object
* returned from this method in their state, assuming that the object will remain unchanged even when the position of
* the selector changes. This may not be the case.
*/ */
@CalledFromHotLoop @CalledFromHotLoop
IndexedInts getRow(); IndexedInts getRow();

View File

@ -38,6 +38,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector
private final boolean descending; private final boolean descending;
private final List<String> timeValues = new ArrayList<>(); private final List<String> timeValues = new ArrayList<>();
private final SingleIndexedInt row = new SingleIndexedInt();
private String currentValue = null; private String currentValue = null;
private long currentTimestamp = Long.MIN_VALUE; private long currentTimestamp = Long.MIN_VALUE;
private int index = -1; private int index = -1;
@ -61,7 +62,8 @@ public class SingleScanTimeDimSelector implements DimensionSelector
@Override @Override
public IndexedInts getRow() public IndexedInts getRow()
{ {
return SingleIndexedInt.of(getDimensionValueIndex()); row.setValue(getDimensionValueIndex());
return row;
} }
@Override @Override

View File

@ -366,6 +366,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
class IndexerDimensionSelector implements DimensionSelector, IdLookup class IndexerDimensionSelector implements DimensionSelector, IdLookup
{ {
private final ArrayBasedIndexedInts indexedInts = new ArrayBasedIndexedInts();
private int[] nullIdIntArray; private int[] nullIdIntArray;
@Override @Override
@ -405,7 +406,8 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
rowSize = indices.length; rowSize = indices.length;
} }
return ArrayBasedIndexedInts.of(row, rowSize); indexedInts.setValues(row, rowSize);
return indexedInts;
} }
@Override @Override

View File

@ -203,10 +203,13 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn<St
class SingleValueQueryableDimensionSelector extends QueryableDimensionSelector class SingleValueQueryableDimensionSelector extends QueryableDimensionSelector
implements SingleValueHistoricalDimensionSelector implements SingleValueHistoricalDimensionSelector
{ {
private final SingleIndexedInt row = new SingleIndexedInt();
@Override @Override
public IndexedInts getRow() public IndexedInts getRow()
{ {
return SingleIndexedInt.of(getRowValue()); row.setValue(getRowValue());
return row;
} }
public int getRowValue() public int getRowValue()
@ -217,7 +220,8 @@ public class SimpleDictionaryEncodedColumn implements DictionaryEncodedColumn<St
@Override @Override
public IndexedInts getRow(int offset) public IndexedInts getRow(int offset)
{ {
return SingleIndexedInt.of(getRowValue(offset)); row.setValue(getRowValue(offset));
return row;
} }
@Override @Override

View File

@ -27,36 +27,55 @@ import it.unimi.dsi.fastutil.ints.IntArrays;
*/ */
public final class ArrayBasedIndexedInts implements IndexedInts public final class ArrayBasedIndexedInts implements IndexedInts
{ {
private static final ArrayBasedIndexedInts EMPTY = new ArrayBasedIndexedInts(IntArrays.EMPTY_ARRAY, 0); private int[] expansion;
private int size;
public static ArrayBasedIndexedInts of(int[] expansion) public ArrayBasedIndexedInts()
{ {
if (expansion.length == 0) { expansion = IntArrays.EMPTY_ARRAY;
return EMPTY; size = 0;
}
return new ArrayBasedIndexedInts(expansion, expansion.length);
} }
public static ArrayBasedIndexedInts of(int[] expansion, int size) public ArrayBasedIndexedInts(int[] expansion)
{
if (size == 0) {
return EMPTY;
}
if (size < 0 || size > expansion.length) {
throw new IAE("Size[%s] should be between 0 and %s", size, expansion.length);
}
return new ArrayBasedIndexedInts(expansion, size);
}
private final int[] expansion;
private final int size;
private ArrayBasedIndexedInts(int[] expansion, int size)
{ {
this.expansion = expansion; this.expansion = expansion;
this.size = expansion.length;
}
public void ensureSize(int size)
{
if (expansion.length < size) {
expansion = new int[size];
}
}
public void setSize(int size)
{
if (size < 0 || size > expansion.length) {
throw new IAE("Size[%d] > expansion.length[%d] or < 0", size, expansion.length);
}
this.size = size; this.size = size;
} }
/**
* Sets the values from the given array. The given values array is not reused and not prone to be mutated later.
* Instead, the values from this array are copied into an array which is internal to ArrayBasedIndexedInts.
*/
public void setValues(int[] values, int size)
{
if (size < 0 || size > values.length) {
throw new IAE("Size[%d] should be between 0 and %d", size, values.length);
}
ensureSize(size);
System.arraycopy(values, 0, expansion, 0, size);
this.size = size;
}
public void setValue(int index, int value)
{
expansion[index] = value;
}
@Override @Override
public int size() public int size()
{ {
@ -66,8 +85,8 @@ public final class ArrayBasedIndexedInts implements IndexedInts
@Override @Override
public int get(int index) public int get(int index)
{ {
if (index >= size) { if (index < 0 || index >= size) {
throw new IndexOutOfBoundsException("index: " + index + ", size: " + size); throw new IAE("index[%d] >= size[%d] or < 0", index, size);
} }
return expansion[index]; return expansion[index];
} }

View File

@ -147,10 +147,13 @@ public class CompressedVSizeColumnarMultiIntsSupplier implements WritableSupplie
private final ColumnarInts offsets; private final ColumnarInts offsets;
private final ColumnarInts values; private final ColumnarInts values;
private final SliceIndexedInts rowValues;
CompressedVSizeColumnarMultiInts(ColumnarInts offsets, ColumnarInts values) CompressedVSizeColumnarMultiInts(ColumnarInts offsets, ColumnarInts values)
{ {
this.offsets = offsets; this.offsets = offsets;
this.values = values; this.values = values;
this.rowValues = new SliceIndexedInts(values);
} }
@Override @Override
@ -177,30 +180,8 @@ public class CompressedVSizeColumnarMultiIntsSupplier implements WritableSupplie
{ {
final int offset = offsets.get(index); final int offset = offsets.get(index);
final int size = offsets.get(index + 1) - offset; final int size = offsets.get(index + 1) - offset;
rowValues.setValues(offset, size);
return new IndexedInts() return rowValues;
{
@Override
public int size()
{
return size;
}
@Override
public int get(int index)
{
if (index >= size) {
throw new IAE("Index[%d] >= size[%d]", index, size);
}
return values.get(index + offset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("values", values);
}
};
} }
@Override @Override

View File

@ -19,40 +19,28 @@
package io.druid.segment.data; package io.druid.segment.data;
import com.google.common.base.Preconditions; import io.druid.java.util.common.IAE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/** /**
* An IndexedInts that always returns [0, 1, ..., N]. * Reusable IndexedInts that returns sequences [0, 1, ..., N].
*/ */
public class RangeIndexedInts implements IndexedInts public class RangeIndexedInts implements IndexedInts
{ {
private static final int CACHE_LIMIT = 8; private int size;
private static final RangeIndexedInts[] CACHE = new RangeIndexedInts[CACHE_LIMIT];
static { public RangeIndexedInts()
for (int i = 0; i < CACHE_LIMIT; i++) { {
CACHE[i] = new RangeIndexedInts(i);
}
} }
private final int size; public void setSize(int size)
private RangeIndexedInts(int size)
{ {
if (size < 0) {
throw new IAE("Size[%d] must be non-negative", size);
}
this.size = size; this.size = size;
} }
public static RangeIndexedInts create(final int size)
{
Preconditions.checkArgument(size >= 0, "size >= 0");
if (size < CACHE_LIMIT) {
return CACHE[size];
} else {
return new RangeIndexedInts(size);
}
}
@Override @Override
public int size() public int size()
{ {
@ -63,7 +51,7 @@ public class RangeIndexedInts implements IndexedInts
public int get(int index) public int get(int index)
{ {
if (index < 0 || index >= size) { if (index < 0 || index >= size) {
throw new IndexOutOfBoundsException("index: " + index); throw new IAE("index[%d] >= size[%d] or < 0", index, size);
} }
return index; return index;
} }

View File

@ -19,35 +19,25 @@
package io.druid.segment.data; package io.druid.segment.data;
import io.druid.java.util.common.IAE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/**
* Reusable IndexedInts that represents a sequence of a solo value [X].
*/
public final class SingleIndexedInt implements IndexedInts public final class SingleIndexedInt implements IndexedInts
{ {
private static final int CACHE_SIZE = 128; private int value;
private static final SingleIndexedInt[] CACHE = new SingleIndexedInt[CACHE_SIZE];
static { public SingleIndexedInt()
for (int i = 0; i < CACHE_SIZE; i++) { {
CACHE[i] = new SingleIndexedInt(i);
}
} }
private final int value; public void setValue(int value)
private SingleIndexedInt(int value)
{ {
this.value = value; this.value = value;
} }
public static SingleIndexedInt of(int value)
{
if (value >= 0 && value < CACHE_SIZE) {
return CACHE[value];
} else {
return new SingleIndexedInt(value);
}
}
@Override @Override
public int size() public int size()
{ {
@ -58,7 +48,7 @@ public final class SingleIndexedInt implements IndexedInts
public int get(int i) public int get(int i)
{ {
if (i != 0) { if (i != 0) {
throw new IllegalArgumentException(i + " != 0"); throw new IAE("%d != 0", i);
} }
return value; return value;
} }

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.data;
import io.druid.java.util.common.IAE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/**
* Reusable IndexedInts, that could represent a sub-sequence ("slice") in a larger IndexedInts object. Used in
* {@link CompressedVSizeColumnarMultiIntsSupplier} implementation.
*
* Unsafe for concurrent use from multiple threads.
*/
public final class SliceIndexedInts implements IndexedInts
{
private final IndexedInts base;
private int offset = -1;
private int size = -1;
public SliceIndexedInts(IndexedInts base)
{
this.base = base;
}
public void setValues(int offset, int size)
{
this.offset = offset;
this.size = size;
}
@Override
public int size()
{
return size;
}
@Override
public int get(int index)
{
if (index < 0 || index >= size) {
throw new IAE("Index[%d] >= size[%d] or < 0", index, size);
}
return base.get(offset + index);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("base", base);
}
}

View File

@ -31,6 +31,7 @@ import io.druid.segment.DimensionSelectorUtils;
import io.druid.segment.IdLookup; import io.druid.segment.IdLookup;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.SingleIndexedInt; import io.druid.segment.data.SingleIndexedInt;
import io.druid.segment.data.ZeroIndexedInts;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -42,6 +43,7 @@ public class SingleStringInputDimensionSelector implements DimensionSelector
private final DimensionSelector selector; private final DimensionSelector selector;
private final Expr expression; private final Expr expression;
private final SingleInputBindings bindings = new SingleInputBindings(); private final SingleInputBindings bindings = new SingleInputBindings();
private final SingleIndexedInt nullAdjustedRow = new SingleIndexedInt();
/** /**
* 0 if selector has null as a value; 1 if it doesn't. * 0 if selector has null as a value; 1 if it doesn't.
@ -90,12 +92,13 @@ public class SingleStringInputDimensionSelector implements DimensionSelector
if (nullAdjustment == 0) { if (nullAdjustment == 0) {
return row; return row;
} else { } else {
return SingleIndexedInt.of(row.get(0) + nullAdjustment); nullAdjustedRow.setValue(row.get(0) + nullAdjustment);
return nullAdjustedRow;
} }
} else { } else {
// Can't handle non-singly-valued rows in expressions. // Can't handle non-singly-valued rows in expressions.
// Treat them as nulls until we think of something better to do. // Treat them as nulls until we think of something better to do.
return SingleIndexedInt.of(0); return ZeroIndexedInts.instance();
} }
} }

View File

@ -100,11 +100,13 @@ public class FilteredAggregatorTest
@Override @Override
public IndexedInts getRow() public IndexedInts getRow()
{ {
SingleIndexedInt row = new SingleIndexedInt();
if (selector.getIndex() % 3 == 2) { if (selector.getIndex() % 3 == 2) {
return SingleIndexedInt.of(1); row.setValue(1);
} else { } else {
return SingleIndexedInt.of(0); row.setValue(0);
} }
return row;
} }
@Override @Override

View File

@ -47,7 +47,7 @@ class TestDimensionSelector implements DimensionSelector
@Override @Override
public IndexedInts getRow() public IndexedInts getRow()
{ {
return ArrayBasedIndexedInts.of(new int[]{2, 4, 6}); return new ArrayBasedIndexedInts(new int[]{2, 4, 6});
} }
@Override @Override

View File

@ -43,7 +43,7 @@ public class IndexedIntsTest
return Arrays.asList( return Arrays.asList(
new Object[][]{ new Object[][]{
{VSizeColumnarInts.fromArray(array)}, {VSizeColumnarInts.fromArray(array)},
{ArrayBasedIndexedInts.of(array)} {new ArrayBasedIndexedInts(array)}
} }
); );
} }

View File

@ -127,7 +127,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter); new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter);
V3CompressedVSizeColumnarMultiIntsSupplier supplierFromIterable = V3CompressedVSizeColumnarMultiIntsSupplier supplierFromIterable =
V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable( V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
Iterables.transform(vals, ArrayBasedIndexedInts::of), Iterables.transform(vals, ArrayBasedIndexedInts::new),
offsetChunkFactor, offsetChunkFactor,
maxValue, maxValue,
byteOrder, byteOrder,