diff --git a/processing/src/main/java/io/druid/query/dimension/ForwardingFilteredDimensionSelector.java b/processing/src/main/java/io/druid/query/dimension/ForwardingFilteredDimensionSelector.java index 880616ef6a2..e64c10e0dc8 100644 --- a/processing/src/main/java/io/druid/query/dimension/ForwardingFilteredDimensionSelector.java +++ b/processing/src/main/java/io/druid/query/dimension/ForwardingFilteredDimensionSelector.java @@ -42,6 +42,7 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id private final IdLookup baseIdLookup; private final Int2IntOpenHashMap forwardMapping; private final int[] reverseMapping; + private final ArrayBasedIndexedInts row = new ArrayBasedIndexedInts(); /** * @param selector must return true from {@link DimensionSelector#nameLookupPossibleInAdvance()} @@ -70,15 +71,17 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id { IndexedInts baseRow = selector.getRow(); int baseRowSize = baseRow.size(); - int[] result = new int[baseRowSize]; + row.ensureSize(baseRowSize); int resultSize = 0; for (int i = 0; i < baseRowSize; i++) { int forwardedValue = forwardMapping.get(baseRow.get(i)); if (forwardedValue >= 0) { - result[resultSize++] = forwardedValue; + row.setValue(resultSize, forwardedValue); + resultSize++; } } - return ArrayBasedIndexedInts.of(result, resultSize); + row.setSize(resultSize); + return row; } @Override diff --git a/processing/src/main/java/io/druid/query/dimension/PredicateFilteredDimensionSelector.java b/processing/src/main/java/io/druid/query/dimension/PredicateFilteredDimensionSelector.java index 0155ccd98d2..32143a85723 100644 --- a/processing/src/main/java/io/druid/query/dimension/PredicateFilteredDimensionSelector.java +++ b/processing/src/main/java/io/druid/query/dimension/PredicateFilteredDimensionSelector.java @@ -34,6 +34,7 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector { private final DimensionSelector selector; private final Predicate predicate; + private final ArrayBasedIndexedInts row = new ArrayBasedIndexedInts(); PredicateFilteredDimensionSelector(DimensionSelector selector, Predicate predicate) { @@ -46,14 +47,16 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector { IndexedInts baseRow = selector.getRow(); int baseRowSize = baseRow.size(); - int[] result = new int[baseRowSize]; + row.ensureSize(baseRowSize); int resultSize = 0; for (int i = 0; i < baseRowSize; i++) { if (predicate.apply(selector.lookupName(baseRow.get(i)))) { - result[resultSize++] = i; + row.setValue(resultSize, i); + resultSize++; } } - return ArrayBasedIndexedInts.of(result, resultSize); + row.setSize(resultSize); + return row; } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java b/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java index fcc62546f47..70e53fa37d3 100644 --- a/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java @@ -203,11 +203,14 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory } else { return new DimensionSelector() { + private final RangeIndexedInts indexedInts = new RangeIndexedInts(); + @Override public IndexedInts getRow() { final List dimensionValues = row.get().getDimension(dimension); - return RangeIndexedInts.create(dimensionValues != null ? dimensionValues.size() : 0); + indexedInts.setSize(dimensionValues != null ? dimensionValues.size() : 0); + return indexedInts; } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java index 503f80b6bd2..db58c36e057 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/column/DictionaryBuildingStringGroupByColumnSelectorStrategy.java @@ -68,21 +68,26 @@ public class DictionaryBuildingStringGroupByColumnSelectorStrategy extends Strin { final DimensionSelector dimSelector = (DimensionSelector) selector; final IndexedInts row = dimSelector.getRow(); - final int[] newIds = new int[row.size()]; - + ArrayBasedIndexedInts newRow = (ArrayBasedIndexedInts) valuess[columnIndex]; + if (newRow == null) { + newRow = new ArrayBasedIndexedInts(); + valuess[columnIndex] = newRow; + } + int rowSize = row.size(); + newRow.ensureSize(rowSize); for (int i = 0; i < row.size(); i++) { final String value = dimSelector.lookupName(row.get(i)); final int dictId = reverseDictionary.getInt(value); if (dictId < 0) { dictionary.add(value); reverseDictionary.put(value, nextId); - newIds[i] = nextId; + newRow.setValue(i, nextId); nextId++; } else { - newIds[i] = dictId; + newRow.setValue(i, dictId); } } - valuess[columnIndex] = ArrayBasedIndexedInts.of(newIds); + newRow.setSize(rowSize); } @Override diff --git a/processing/src/main/java/io/druid/segment/ColumnValueSelector.java b/processing/src/main/java/io/druid/segment/ColumnValueSelector.java index efb8c27caab..c6a147d16c0 100644 --- a/processing/src/main/java/io/druid/segment/ColumnValueSelector.java +++ b/processing/src/main/java/io/druid/segment/ColumnValueSelector.java @@ -22,7 +22,8 @@ package io.druid.segment; import io.druid.guice.annotations.PublicApi; /** - * Base type for interfaces that manage column value selection, e.g. DimensionSelector, LongColumnSelector + * Base type for interfaces that manage column value selection, e.g. {@link DimensionSelector}, {@link + * LongColumnSelector}. * * This interface has methods to get the value in all primitive types, that have corresponding basic aggregators in * Druid: Sum, Min, Max, etc: {@link #getFloat()}, {@link #getDouble()} and {@link #getLong()} to support "polymorphic" diff --git a/processing/src/main/java/io/druid/segment/DimensionSelector.java b/processing/src/main/java/io/druid/segment/DimensionSelector.java index ae9c04f66f1..91e09907048 100644 --- a/processing/src/main/java/io/druid/segment/DimensionSelector.java +++ b/processing/src/main/java/io/druid/segment/DimensionSelector.java @@ -36,11 +36,15 @@ public interface DimensionSelector extends ColumnValueSelector, HotLoopCallee int CARDINALITY_UNKNOWN = -1; /** - * Gets all values for the row inside of an IntBuffer. I.e. one possible implementation could be + * Returns the indexed values at the current position in this DimensionSelector. * - * return IntBuffer.wrap(lookupExpansion(get()); - * - * @return all values for the row as an IntBuffer + * IMPORTANT. The returned {@link IndexedInts} object could generally be reused inside the implementation of + * DimensionSelector, i. e. this method could always return the same object for the same selector. Users + * of this API, such as {@link io.druid.query.aggregation.Aggregator#aggregate()}, {@link + * io.druid.query.aggregation.BufferAggregator#aggregate}, {@link io.druid.query.aggregation.AggregateCombiner#reset}, + * {@link io.druid.query.aggregation.AggregateCombiner#fold} should be prepared for that and not storing the object + * returned from this method in their state, assuming that the object will remain unchanged even when the position of + * the selector changes. This may not be the case. */ @CalledFromHotLoop IndexedInts getRow(); diff --git a/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java b/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java index 1d140304fcc..c04fb0c8b5c 100644 --- a/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java +++ b/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java @@ -38,6 +38,7 @@ public class SingleScanTimeDimSelector implements DimensionSelector private final boolean descending; private final List timeValues = new ArrayList<>(); + private final SingleIndexedInt row = new SingleIndexedInt(); private String currentValue = null; private long currentTimestamp = Long.MIN_VALUE; private int index = -1; @@ -61,7 +62,8 @@ public class SingleScanTimeDimSelector implements DimensionSelector @Override public IndexedInts getRow() { - return SingleIndexedInt.of(getDimensionValueIndex()); + row.setValue(getDimensionValueIndex()); + return row; } @Override diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index 71e1c3443fe..bbe89d60338 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -366,6 +366,7 @@ public class StringDimensionIndexer implements DimensionIndexer expansion.length) { - throw new IAE("Size[%s] should be between 0 and %s", size, expansion.length); - } - return new ArrayBasedIndexedInts(expansion, size); - } - - private final int[] expansion; - private final int size; - - private ArrayBasedIndexedInts(int[] expansion, int size) + public ArrayBasedIndexedInts(int[] expansion) { this.expansion = expansion; + this.size = expansion.length; + } + + public void ensureSize(int size) + { + if (expansion.length < size) { + expansion = new int[size]; + } + } + + public void setSize(int size) + { + if (size < 0 || size > expansion.length) { + throw new IAE("Size[%d] > expansion.length[%d] or < 0", size, expansion.length); + } this.size = size; } + /** + * Sets the values from the given array. The given values array is not reused and not prone to be mutated later. + * Instead, the values from this array are copied into an array which is internal to ArrayBasedIndexedInts. + */ + public void setValues(int[] values, int size) + { + if (size < 0 || size > values.length) { + throw new IAE("Size[%d] should be between 0 and %d", size, values.length); + } + ensureSize(size); + System.arraycopy(values, 0, expansion, 0, size); + this.size = size; + } + + public void setValue(int index, int value) + { + expansion[index] = value; + } + @Override public int size() { @@ -66,8 +85,8 @@ public final class ArrayBasedIndexedInts implements IndexedInts @Override public int get(int index) { - if (index >= size) { - throw new IndexOutOfBoundsException("index: " + index + ", size: " + size); + if (index < 0 || index >= size) { + throw new IAE("index[%d] >= size[%d] or < 0", index, size); } return expansion[index]; } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java index 19b1596cb59..4d90ccaeac8 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedVSizeColumnarMultiIntsSupplier.java @@ -147,10 +147,13 @@ public class CompressedVSizeColumnarMultiIntsSupplier implements WritableSupplie private final ColumnarInts offsets; private final ColumnarInts values; + private final SliceIndexedInts rowValues; + CompressedVSizeColumnarMultiInts(ColumnarInts offsets, ColumnarInts values) { this.offsets = offsets; this.values = values; + this.rowValues = new SliceIndexedInts(values); } @Override @@ -177,30 +180,8 @@ public class CompressedVSizeColumnarMultiIntsSupplier implements WritableSupplie { final int offset = offsets.get(index); final int size = offsets.get(index + 1) - offset; - - return new IndexedInts() - { - @Override - public int size() - { - return size; - } - - @Override - public int get(int index) - { - if (index >= size) { - throw new IAE("Index[%d] >= size[%d]", index, size); - } - return values.get(index + offset); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("values", values); - } - }; + rowValues.setValues(offset, size); + return rowValues; } @Override diff --git a/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java b/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java index 14d2d89b3a1..213e96f7808 100644 --- a/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java +++ b/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java @@ -19,40 +19,28 @@ package io.druid.segment.data; -import com.google.common.base.Preconditions; +import io.druid.java.util.common.IAE; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; /** - * An IndexedInts that always returns [0, 1, ..., N]. + * Reusable IndexedInts that returns sequences [0, 1, ..., N]. */ public class RangeIndexedInts implements IndexedInts { - private static final int CACHE_LIMIT = 8; - private static final RangeIndexedInts[] CACHE = new RangeIndexedInts[CACHE_LIMIT]; + private int size; - static { - for (int i = 0; i < CACHE_LIMIT; i++) { - CACHE[i] = new RangeIndexedInts(i); - } + public RangeIndexedInts() + { } - private final int size; - - private RangeIndexedInts(int size) + public void setSize(int size) { + if (size < 0) { + throw new IAE("Size[%d] must be non-negative", size); + } this.size = size; } - public static RangeIndexedInts create(final int size) - { - Preconditions.checkArgument(size >= 0, "size >= 0"); - if (size < CACHE_LIMIT) { - return CACHE[size]; - } else { - return new RangeIndexedInts(size); - } - } - @Override public int size() { @@ -63,7 +51,7 @@ public class RangeIndexedInts implements IndexedInts public int get(int index) { if (index < 0 || index >= size) { - throw new IndexOutOfBoundsException("index: " + index); + throw new IAE("index[%d] >= size[%d] or < 0", index, size); } return index; } diff --git a/processing/src/main/java/io/druid/segment/data/SingleIndexedInt.java b/processing/src/main/java/io/druid/segment/data/SingleIndexedInt.java index 9bb93311a02..cff83e1bd37 100644 --- a/processing/src/main/java/io/druid/segment/data/SingleIndexedInt.java +++ b/processing/src/main/java/io/druid/segment/data/SingleIndexedInt.java @@ -19,35 +19,25 @@ package io.druid.segment.data; +import io.druid.java.util.common.IAE; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +/** + * Reusable IndexedInts that represents a sequence of a solo value [X]. + */ public final class SingleIndexedInt implements IndexedInts { - private static final int CACHE_SIZE = 128; - private static final SingleIndexedInt[] CACHE = new SingleIndexedInt[CACHE_SIZE]; + private int value; - static { - for (int i = 0; i < CACHE_SIZE; i++) { - CACHE[i] = new SingleIndexedInt(i); - } + public SingleIndexedInt() + { } - private final int value; - - private SingleIndexedInt(int value) + public void setValue(int value) { this.value = value; } - public static SingleIndexedInt of(int value) - { - if (value >= 0 && value < CACHE_SIZE) { - return CACHE[value]; - } else { - return new SingleIndexedInt(value); - } - } - @Override public int size() { @@ -58,7 +48,7 @@ public final class SingleIndexedInt implements IndexedInts public int get(int i) { if (i != 0) { - throw new IllegalArgumentException(i + " != 0"); + throw new IAE("%d != 0", i); } return value; } diff --git a/processing/src/main/java/io/druid/segment/data/SliceIndexedInts.java b/processing/src/main/java/io/druid/segment/data/SliceIndexedInts.java new file mode 100644 index 00000000000..60aadef4d86 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/SliceIndexedInts.java @@ -0,0 +1,68 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import io.druid.java.util.common.IAE; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; + +/** + * Reusable IndexedInts, that could represent a sub-sequence ("slice") in a larger IndexedInts object. Used in + * {@link CompressedVSizeColumnarMultiIntsSupplier} implementation. + * + * Unsafe for concurrent use from multiple threads. + */ +public final class SliceIndexedInts implements IndexedInts +{ + private final IndexedInts base; + private int offset = -1; + private int size = -1; + + public SliceIndexedInts(IndexedInts base) + { + this.base = base; + } + + public void setValues(int offset, int size) + { + this.offset = offset; + this.size = size; + } + + @Override + public int size() + { + return size; + } + + @Override + public int get(int index) + { + if (index < 0 || index >= size) { + throw new IAE("Index[%d] >= size[%d] or < 0", index, size); + } + return base.get(offset + index); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("base", base); + } +} diff --git a/processing/src/main/java/io/druid/segment/virtual/SingleStringInputDimensionSelector.java b/processing/src/main/java/io/druid/segment/virtual/SingleStringInputDimensionSelector.java index 360f9f2e3ee..8f070435cac 100644 --- a/processing/src/main/java/io/druid/segment/virtual/SingleStringInputDimensionSelector.java +++ b/processing/src/main/java/io/druid/segment/virtual/SingleStringInputDimensionSelector.java @@ -31,6 +31,7 @@ import io.druid.segment.DimensionSelectorUtils; import io.druid.segment.IdLookup; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.SingleIndexedInt; +import io.druid.segment.data.ZeroIndexedInts; import javax.annotation.Nullable; @@ -42,6 +43,7 @@ public class SingleStringInputDimensionSelector implements DimensionSelector private final DimensionSelector selector; private final Expr expression; private final SingleInputBindings bindings = new SingleInputBindings(); + private final SingleIndexedInt nullAdjustedRow = new SingleIndexedInt(); /** * 0 if selector has null as a value; 1 if it doesn't. @@ -90,12 +92,13 @@ public class SingleStringInputDimensionSelector implements DimensionSelector if (nullAdjustment == 0) { return row; } else { - return SingleIndexedInt.of(row.get(0) + nullAdjustment); + nullAdjustedRow.setValue(row.get(0) + nullAdjustment); + return nullAdjustedRow; } } else { // Can't handle non-singly-valued rows in expressions. // Treat them as nulls until we think of something better to do. - return SingleIndexedInt.of(0); + return ZeroIndexedInts.instance(); } } diff --git a/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java index 41f49e5909d..584f08a2b81 100644 --- a/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java @@ -100,11 +100,13 @@ public class FilteredAggregatorTest @Override public IndexedInts getRow() { + SingleIndexedInt row = new SingleIndexedInt(); if (selector.getIndex() % 3 == 2) { - return SingleIndexedInt.of(1); + row.setValue(1); } else { - return SingleIndexedInt.of(0); + row.setValue(0); } + return row; } @Override diff --git a/processing/src/test/java/io/druid/query/dimension/TestDimensionSelector.java b/processing/src/test/java/io/druid/query/dimension/TestDimensionSelector.java index 9ad9b1fb3e1..3969bfcb99c 100644 --- a/processing/src/test/java/io/druid/query/dimension/TestDimensionSelector.java +++ b/processing/src/test/java/io/druid/query/dimension/TestDimensionSelector.java @@ -47,7 +47,7 @@ class TestDimensionSelector implements DimensionSelector @Override public IndexedInts getRow() { - return ArrayBasedIndexedInts.of(new int[]{2, 4, 6}); + return new ArrayBasedIndexedInts(new int[]{2, 4, 6}); } @Override diff --git a/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java b/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java index a69272e1d51..8a903800a0e 100644 --- a/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java +++ b/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java @@ -43,7 +43,7 @@ public class IndexedIntsTest return Arrays.asList( new Object[][]{ {VSizeColumnarInts.fromArray(array)}, - {ArrayBasedIndexedInts.of(array)} + {new ArrayBasedIndexedInts(array)} } ); } diff --git a/processing/src/test/java/io/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/io/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java index c7b17f22e74..446d7f0c1b6 100644 --- a/processing/src/test/java/io/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java +++ b/processing/src/test/java/io/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java @@ -127,7 +127,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter); V3CompressedVSizeColumnarMultiIntsSupplier supplierFromIterable = V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable( - Iterables.transform(vals, ArrayBasedIndexedInts::of), + Iterables.transform(vals, ArrayBasedIndexedInts::new), offsetChunkFactor, maxValue, byteOrder,