diff --git a/common/src/main/java/io/druid/annotations/SubclassesMustBePublic.java b/common/src/main/java/io/druid/annotations/SubclassesMustBePublic.java new file mode 100644 index 00000000000..2d8771fe8f0 --- /dev/null +++ b/common/src/main/java/io/druid/annotations/SubclassesMustBePublic.java @@ -0,0 +1,35 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Inherited +@Retention(RetentionPolicy.SOURCE) +@Target(ElementType.TYPE) +public @interface SubclassesMustBePublic +{ +} diff --git a/common/src/main/java/io/druid/annotations/SubclassesMustBePublicAnnotationProcessor.java b/common/src/main/java/io/druid/annotations/SubclassesMustBePublicAnnotationProcessor.java new file mode 100644 index 00000000000..2a8f3098107 --- /dev/null +++ b/common/src/main/java/io/druid/annotations/SubclassesMustBePublicAnnotationProcessor.java @@ -0,0 +1,51 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.annotations; + +import javax.annotation.processing.AbstractProcessor; +import javax.annotation.processing.RoundEnvironment; +import javax.annotation.processing.SupportedAnnotationTypes; +import javax.lang.model.element.Element; +import javax.lang.model.element.Modifier; +import javax.lang.model.element.TypeElement; +import javax.tools.Diagnostic; +import java.util.Set; + +@SupportedAnnotationTypes("io.druid.annotations.SubclassesMustBePublic") +public class SubclassesMustBePublicAnnotationProcessor extends AbstractProcessor +{ + @Override + public boolean process(Set annotations, RoundEnvironment roundEnv) + { + for (TypeElement annotation : annotations) { + Set elementsAnnotatedWith = roundEnv.getElementsAnnotatedWith(annotation); + for (Element element : elementsAnnotatedWith) { + if (!element.getModifiers().contains(Modifier.PUBLIC)) { + processingEnv.getMessager().printMessage( + Diagnostic.Kind.ERROR, + element.getSimpleName() + " must be public", + element + ); + } + } + } + return false; + } +} diff --git a/common/src/main/resources/services/javax.annotation.processing.Processor b/common/src/main/resources/services/javax.annotation.processing.Processor new file mode 100644 index 00000000000..b19a8c2290c --- /dev/null +++ b/common/src/main/resources/services/javax.annotation.processing.Processor @@ -0,0 +1 @@ +io.druid.annotations.SubclassesMustBePublicAnnotationProcessor \ No newline at end of file diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountBufferAggregator.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountBufferAggregator.java index 62e95a2c093..25096ab27b6 100644 --- a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountBufferAggregator.java +++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountBufferAggregator.java @@ -77,5 +77,6 @@ public final class EmptyDistinctCountBufferAggregator implements BufferAggregato @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchBufferAggregator.java index 2014314675c..d97622d5ff8 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchBufferAggregator.java @@ -73,5 +73,6 @@ public final class EmptySketchBufferAggregator implements BufferAggregator @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/processing/pom.xml b/processing/pom.xml index ee9564ebb20..335a7a17a3e 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -132,6 +132,22 @@ + + org.apache.maven.plugins + maven-compiler-plugin + + + + io.druid + druid-common + ${project.parent.version} + + + + io.druid.annotations.SubclassesMustBePublicAnnotationProcessor + + + org.apache.maven.plugins maven-jar-plugin diff --git a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java index 16574ac0e3e..c45582e9e02 100644 --- a/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/BufferAggregator.java @@ -126,6 +126,7 @@ public interface BufferAggregator extends HotLoopCallee @Override default void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } /** diff --git a/processing/src/main/java/io/druid/query/aggregation/CountBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/CountBufferAggregator.java index daedab3f585..ae8ee742ff7 100644 --- a/processing/src/main/java/io/druid/query/aggregation/CountBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/CountBufferAggregator.java @@ -68,5 +68,6 @@ public class CountBufferAggregator implements BufferAggregator @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxBufferAggregator.java index 7a08e5a0434..ed12dce4e30 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMaxBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMaxBufferAggregator.java @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; /** */ -public class DoubleMaxBufferAggregator extends DoubleBufferAggregator +public class DoubleMaxBufferAggregator extends SimpleDoubleBufferAggregator { DoubleMaxBufferAggregator(FloatColumnSelector selector) @@ -40,8 +40,18 @@ public class DoubleMaxBufferAggregator extends DoubleBufferAggregator } @Override - public void aggregate(ByteBuffer buf, int position) + public void putFirst(ByteBuffer buf, int position, double value) { - buf.putDouble(position, Math.max(buf.getDouble(position), (double) selector.get())); + if (!Double.isNaN(value)) { + buf.putDouble(position, value); + } else { + init(buf, position); + } + } + + @Override + public void aggregate(ByteBuffer buf, int position, double value) + { + buf.putDouble(position, Math.max(buf.getDouble(position), value)); } } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleMinBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/DoubleMinBufferAggregator.java index b11712818d2..427d3290b06 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleMinBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleMinBufferAggregator.java @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; /** */ -public class DoubleMinBufferAggregator extends DoubleBufferAggregator +public class DoubleMinBufferAggregator extends SimpleDoubleBufferAggregator { DoubleMinBufferAggregator(FloatColumnSelector selector) @@ -40,8 +40,18 @@ public class DoubleMinBufferAggregator extends DoubleBufferAggregator } @Override - public void aggregate(ByteBuffer buf, int position) + public void putFirst(ByteBuffer buf, int position, double value) { - buf.putDouble(position, Math.min(buf.getDouble(position), (double) selector.get())); + if (!Double.isNaN(value)) { + buf.putDouble(position, value); + } else { + init(buf, position); + } + } + + @Override + public void aggregate(ByteBuffer buf, int position, double value) + { + buf.putDouble(position, Math.min(buf.getDouble(position), value)); } } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleSumBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/DoubleSumBufferAggregator.java index 2c065ce49a7..600fa646ee2 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleSumBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/DoubleSumBufferAggregator.java @@ -25,7 +25,7 @@ import java.nio.ByteBuffer; /** */ -public class DoubleSumBufferAggregator extends DoubleBufferAggregator +public class DoubleSumBufferAggregator extends SimpleDoubleBufferAggregator { DoubleSumBufferAggregator(FloatColumnSelector selector) @@ -40,8 +40,14 @@ public class DoubleSumBufferAggregator extends DoubleBufferAggregator } @Override - public void aggregate(ByteBuffer buf, int position) + public void putFirst(ByteBuffer buf, int position, double value) { - buf.putDouble(position, buf.getDouble(position) + (double) selector.get()); + buf.putDouble(position, value); + } + + @Override + public void aggregate(ByteBuffer buf, int position, double value) + { + buf.putDouble(position, buf.getDouble(position) + value); } } diff --git a/processing/src/main/java/io/druid/query/aggregation/NoopBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/NoopBufferAggregator.java index fd4d6bd51d7..93ee651b449 100644 --- a/processing/src/main/java/io/druid/query/aggregation/NoopBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/NoopBufferAggregator.java @@ -73,5 +73,6 @@ public final class NoopBufferAggregator implements BufferAggregator @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/query/aggregation/DoubleBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/SimpleDoubleBufferAggregator.java similarity index 61% rename from processing/src/main/java/io/druid/query/aggregation/DoubleBufferAggregator.java rename to processing/src/main/java/io/druid/query/aggregation/SimpleDoubleBufferAggregator.java index 0e868674816..33090f5463c 100644 --- a/processing/src/main/java/io/druid/query/aggregation/DoubleBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/SimpleDoubleBufferAggregator.java @@ -19,34 +19,57 @@ package io.druid.query.aggregation; +import io.druid.query.monomorphicprocessing.CalledFromHotLoop; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.FloatColumnSelector; import java.nio.ByteBuffer; -public abstract class DoubleBufferAggregator implements BufferAggregator +public abstract class SimpleDoubleBufferAggregator implements BufferAggregator { protected final FloatColumnSelector selector; - DoubleBufferAggregator(FloatColumnSelector selector) + SimpleDoubleBufferAggregator(FloatColumnSelector selector) { this.selector = selector; } + public FloatColumnSelector getSelector() + { + return selector; + } + + /** + * Faster equivalent to + * aggregator.init(buf, position); + * aggregator.aggregate(buf, position, value); + */ + @CalledFromHotLoop + public abstract void putFirst(ByteBuffer buf, int position, double value); + + @CalledFromHotLoop + public abstract void aggregate(ByteBuffer buf, int position, double value); + @Override - public Object get(ByteBuffer buf, int position) + public final void aggregate(ByteBuffer buf, int position) + { + aggregate(buf, position, (double) selector.get()); + } + + @Override + public final Object get(ByteBuffer buf, int position) { return buf.getDouble(position); } @Override - public float getFloat(ByteBuffer buf, int position) + public final float getFloat(ByteBuffer buf, int position) { return (float) buf.getDouble(position); } @Override - public long getLong(ByteBuffer buf, int position) + public final long getLong(ByteBuffer buf, int position) { return (long) buf.getDouble(position); } diff --git a/processing/src/main/java/io/druid/query/dimension/ForwardingFilteredDimensionSelector.java b/processing/src/main/java/io/druid/query/dimension/ForwardingFilteredDimensionSelector.java index 52c489cce6e..fa19679fef6 100644 --- a/processing/src/main/java/io/druid/query/dimension/ForwardingFilteredDimensionSelector.java +++ b/processing/src/main/java/io/druid/query/dimension/ForwardingFilteredDimensionSelector.java @@ -31,7 +31,7 @@ import io.druid.segment.IdLookup; import io.druid.segment.data.ArrayBasedIndexedInts; import io.druid.segment.data.IndexedInts; import io.druid.segment.filter.BooleanValueMatcher; -import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; import javax.annotation.Nullable; import java.util.BitSet; @@ -40,14 +40,18 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id { private final DimensionSelector selector; private final IdLookup baseIdLookup; - private final Int2IntMap forwardMapping; + private final Int2IntOpenHashMap forwardMapping; private final int[] reverseMapping; /** * @param selector must return true from {@link DimensionSelector#nameLookupPossibleInAdvance()} - * @param forwardMapping must have {@link Int2IntMap#defaultReturnValue(int)} configured to -1. + * @param forwardMapping must have {@link Int2IntOpenHashMap#defaultReturnValue(int)} configured to -1. */ - ForwardingFilteredDimensionSelector(DimensionSelector selector, Int2IntMap forwardMapping, int[] reverseMapping) + ForwardingFilteredDimensionSelector( + DimensionSelector selector, + Int2IntOpenHashMap forwardMapping, + int[] reverseMapping + ) { this.selector = Preconditions.checkNotNull(selector); if (!selector.nameLookupPossibleInAdvance()) { @@ -106,6 +110,12 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id // null should match empty rows in multi-value columns return nullRow && value == null; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } }; } else { return BooleanValueMatcher.of(false); @@ -141,6 +151,12 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id // null should match empty rows in multi-value columns return nullRow && matchNull; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } }; } @@ -179,6 +195,5 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("selector", selector); - inspector.visit("forwardMapping", forwardMapping); } } diff --git a/processing/src/main/java/io/druid/query/dimension/ListFilteredDimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/ListFilteredDimensionSpec.java index c1234f6ebcf..c7ab3873b57 100644 --- a/processing/src/main/java/io/druid/query/dimension/ListFilteredDimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/ListFilteredDimensionSpec.java @@ -28,7 +28,6 @@ import io.druid.java.util.common.StringUtils; import io.druid.query.filter.DimFilterUtils; import io.druid.segment.DimensionSelector; import io.druid.segment.IdLookup; -import it.unimi.dsi.fastutil.ints.Int2IntMap; import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; import javax.annotation.Nullable; @@ -93,7 +92,7 @@ public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec } final int maxPossibleFilteredCardinality = values.size(); int count = 0; - final Int2IntMap forwardMapping = new Int2IntOpenHashMap(maxPossibleFilteredCardinality); + final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap(maxPossibleFilteredCardinality); forwardMapping.defaultReturnValue(-1); final int[] reverseMapping = new int[maxPossibleFilteredCardinality]; IdLookup idLookup = selector.idLookup(); @@ -134,7 +133,7 @@ public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec } final int maxPossibleFilteredCardinality = selectorCardinality; int count = 0; - final Int2IntMap forwardMapping = new Int2IntOpenHashMap(maxPossibleFilteredCardinality); + final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap(maxPossibleFilteredCardinality); forwardMapping.defaultReturnValue(-1); final int[] reverseMapping = new int[maxPossibleFilteredCardinality]; for (int i = 0; i < selectorCardinality; i++) { diff --git a/processing/src/main/java/io/druid/query/dimension/PredicateFilteredDimensionSelector.java b/processing/src/main/java/io/druid/query/dimension/PredicateFilteredDimensionSelector.java index 44b9fc34747..a99eda5c521 100644 --- a/processing/src/main/java/io/druid/query/dimension/PredicateFilteredDimensionSelector.java +++ b/processing/src/main/java/io/druid/query/dimension/PredicateFilteredDimensionSelector.java @@ -79,6 +79,13 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector // null should match empty rows in multi-value columns return nullRow && value == null; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // PredicateFilteredDimensionSelector.this inspects selector and predicate as well. + inspector.visit("selector", PredicateFilteredDimensionSelector.this); + } }; } @@ -106,6 +113,14 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector // null should match empty rows in multi-value columns return nullRow && matchNull; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // PredicateFilteredDimensionSelector.this inspects selector and predicate as well. + inspector.visit("selector", PredicateFilteredDimensionSelector.this); + inspector.visit("matcherPredicate", matcherPredicate); + } }; } diff --git a/processing/src/main/java/io/druid/query/dimension/RegexFilteredDimensionSpec.java b/processing/src/main/java/io/druid/query/dimension/RegexFilteredDimensionSpec.java index 3cfd7361488..4ebd407083f 100644 --- a/processing/src/main/java/io/druid/query/dimension/RegexFilteredDimensionSpec.java +++ b/processing/src/main/java/io/druid/query/dimension/RegexFilteredDimensionSpec.java @@ -83,7 +83,7 @@ public class RegexFilteredDimensionSpec extends BaseFilteredDimensionSpec } int count = 0; - final Int2IntMap forwardMapping = new Int2IntOpenHashMap(); + final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap(); forwardMapping.defaultReturnValue(-1); for (int i = 0; i < selectorCardinality; i++) { if (compiledRegex.matcher(Strings.nullToEmpty(selector.lookupName(i))).matches()) { diff --git a/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java index 2e16149c103..b209f1b6534 100644 --- a/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java @@ -19,6 +19,7 @@ package io.druid.query.filter; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.FloatColumnSelector; import io.druid.segment.filter.BooleanValueMatcher; @@ -41,6 +42,12 @@ public class FloatValueMatcherColumnSelectorStrategy implements ValueMatcherColu { return Float.floatToIntBits(selector.get()) == matchValIntBits; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } }; } @@ -57,6 +64,13 @@ public class FloatValueMatcherColumnSelectorStrategy implements ValueMatcherColu { return predicate.applyFloat(selector.get()); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + inspector.visit("predicate", predicate); + } }; } diff --git a/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java index a0a2f59bf0d..3995d9e0ae7 100644 --- a/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java @@ -19,6 +19,7 @@ package io.druid.query.filter; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.LongColumnSelector; import io.druid.segment.filter.BooleanValueMatcher; @@ -40,6 +41,12 @@ public class LongValueMatcherColumnSelectorStrategy implements ValueMatcherColum { return selector.get() == matchValLong; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } }; } @@ -56,6 +63,13 @@ public class LongValueMatcherColumnSelectorStrategy implements ValueMatcherColum { return predicate.applyLong(selector.get()); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + inspector.visit("predicate", predicate); + } }; } diff --git a/processing/src/main/java/io/druid/query/filter/ValueMatcher.java b/processing/src/main/java/io/druid/query/filter/ValueMatcher.java index b701f3ade38..cad255474d8 100644 --- a/processing/src/main/java/io/druid/query/filter/ValueMatcher.java +++ b/processing/src/main/java/io/druid/query/filter/ValueMatcher.java @@ -19,9 +19,13 @@ package io.druid.query.filter; +import io.druid.query.monomorphicprocessing.CalledFromHotLoop; +import io.druid.query.monomorphicprocessing.HotLoopCallee; + /** */ -public interface ValueMatcher +public interface ValueMatcher extends HotLoopCallee { - public boolean matches(); + @CalledFromHotLoop + boolean matches(); } diff --git a/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java b/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java index d8d15070a8b..3fcb2434ae9 100644 --- a/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java @@ -34,6 +34,7 @@ import io.druid.segment.FloatColumnSelector; import io.druid.segment.IdLookup; import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.SingleValueDimensionSelector; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilitiesImpl; @@ -105,7 +106,7 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory throw new UnsupportedOperationException("time dimension must provide an extraction function"); } - return new DimensionSelector() + return new SingleValueDimensionSelector() { @Override public IndexedInts getRow() @@ -113,6 +114,12 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory return ZeroIndexedInts.instance(); } + @Override + public int getRowValue() + { + return 0; + } + @Override public ValueMatcher makeValueMatcher(final String value) { @@ -124,6 +131,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory String rowValue = extractionFn.apply(row.get().getTimestampFromEpoch()); return Objects.equals(rowValue, value); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("row", row); + inspector.visit("extractionFn", extractionFn); + } }; } @@ -138,6 +152,14 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory String rowValue = extractionFn.apply(row.get().getTimestampFromEpoch()); return predicate.apply(rowValue); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("row", row); + inspector.visit("extractionFn", extractionFn); + inspector.visit("predicate", predicate); + } }; } @@ -204,6 +226,12 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory } return false; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("row", row); + } }; } else { return new ValueMatcher() @@ -223,6 +251,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory } return false; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("row", row); + inspector.visit("extractionFn", extractionFn); + } }; } } @@ -249,6 +284,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory } return false; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("row", row); + inspector.visit("predicate", predicate); + } }; } else { return new ValueMatcher() @@ -268,6 +310,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory } return false; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("row", row); + inspector.visit("predicate", predicate); + } }; } } diff --git a/processing/src/main/java/io/druid/query/monomorphicprocessing/HotLoopCallee.java b/processing/src/main/java/io/druid/query/monomorphicprocessing/HotLoopCallee.java index af82900fc80..19aa3f4219e 100644 --- a/processing/src/main/java/io/druid/query/monomorphicprocessing/HotLoopCallee.java +++ b/processing/src/main/java/io/druid/query/monomorphicprocessing/HotLoopCallee.java @@ -37,7 +37,7 @@ public interface HotLoopCallee * this instance (the instance on which inspectRuntimeShape() is called) is configured. * d. ByteBuffer or similar objects, where byte order matters * e. boolean flags, affecting branch taking - * f. Arrays of objects, meeting conditions any of conditions a-e. + * f. Arrays of objects, meeting any of conditions a-e. */ void inspectRuntimeShape(RuntimeShapeInspector inspector); } diff --git a/processing/src/main/java/io/druid/query/monomorphicprocessing/SpecializationService.java b/processing/src/main/java/io/druid/query/monomorphicprocessing/SpecializationService.java index e2d00b05031..5b590b47231 100644 --- a/processing/src/main/java/io/druid/query/monomorphicprocessing/SpecializationService.java +++ b/processing/src/main/java/io/druid/query/monomorphicprocessing/SpecializationService.java @@ -100,6 +100,7 @@ public final class SpecializationService * specialization takes some JVM memory (machine code cache, byte code, etc.) */ private static final int maxSpecializations = Integer.getInteger("maxSpecializations", 1000); + private static final AtomicBoolean maxSpecializationsWarningEmitted = new AtomicBoolean(false); private static final ExecutorService classSpecializationExecutor = Execs.singleThreaded("class-specialization-%d"); @@ -124,7 +125,7 @@ public final class SpecializationService String runtimeShape ) { - return getSpecializationState(prototypeClass, runtimeShape, ImmutableMap., Class>of()); + return getSpecializationState(prototypeClass, runtimeShape, ImmutableMap.of()); } /** @@ -292,6 +293,8 @@ public final class SpecializationService @Override public T getSpecialized() { + // Returns null because the class is not yet specialized. The purpose of WindowedLoopIterationCounter is to decide + // whether specialization should be done, or not. return null; } @@ -329,7 +332,7 @@ public final class SpecializationService } } if (!currentMinutePresent) { - perMinuteIterations.computeIfAbsent(currentMinute, AtomicLong::new).addAndGet(newIterations); + perMinuteIterations.computeIfAbsent(currentMinute, m -> new AtomicLong()).addAndGet(newIterations); totalIterations += newIterations; } return totalIterations; @@ -346,18 +349,33 @@ public final class SpecializationService // PerPrototypeClassState.specializationStates. But it might be that nobody ever hits even the current // maxSpecializations limit, so implementing cache eviction is an unnecessary complexity. specialized = perPrototypeClassState.prototypeClass.newInstance(); - LOG.warn( - "SpecializationService couldn't make more than [%d] specializations. " - + "Not doing specialization for runtime shape[%s] and class remapping[%s], using the prototype class[%s]", - maxSpecializations, - specializationId.runtimeShape, - specializationId.classRemapping, - perPrototypeClassState.prototypeClass - ); + if (!maxSpecializationsWarningEmitted.get() && maxSpecializationsWarningEmitted.compareAndSet(false, true)) { + LOG.warn( + "SpecializationService couldn't make more than [%d] specializations. " + + "Not doing specialization for runtime shape[%s] and class remapping[%s], using the prototype class[%s]", + maxSpecializations, + specializationId.runtimeShape, + specializationId.classRemapping, + perPrototypeClassState.prototypeClass + ); + } } else if (fakeSpecialize) { specialized = perPrototypeClassState.prototypeClass.newInstance(); + LOG.info( + "Not specializing prototype class[%s] for runtime shape[%s] and class remapping[%s] because " + + "fakeSpecialize=true, using the prototype class instead", + perPrototypeClassState.prototypeClass, + specializationId.runtimeShape, + specializationId.classRemapping + ); } else { specialized = perPrototypeClassState.specialize(specializationId.classRemapping); + LOG.info( + "Specializing prototype class[%s] for runtime shape[%s] and class remapping[%s]", + perPrototypeClassState.prototypeClass, + specializationId.runtimeShape, + specializationId.classRemapping + ); } perPrototypeClassState.specializationStates.put(specializationId, new Specialized<>(specialized)); } diff --git a/processing/src/main/java/io/druid/query/topn/Historical1AggPooledTopNScanner.java b/processing/src/main/java/io/druid/query/topn/Historical1AggPooledTopNScanner.java new file mode 100644 index 00000000000..fc1691a1078 --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/Historical1AggPooledTopNScanner.java @@ -0,0 +1,48 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.historical.HistoricalCursor; +import io.druid.segment.historical.HistoricalDimensionSelector; + +import java.nio.ByteBuffer; + +public interface Historical1AggPooledTopNScanner< + DimensionSelectorType extends HistoricalDimensionSelector, + MetricSelectorType extends ColumnValueSelector, + BufferAggregatorType extends BufferAggregator> +{ + /** + * @param aggregatorSize number of bytes required by aggregator for a single aggregation + * @param positions a cache for positions in resultsBuffer, where specific (indexed) dimension values are aggregated + * @return number of scanned rows, i. e. number of steps made with the given cursor + */ + long scanAndAggregate( + DimensionSelectorType dimensionSelector, + MetricSelectorType metricSelector, + BufferAggregatorType aggregator, + int aggregatorSize, + HistoricalCursor cursor, + int[] positions, + ByteBuffer resultsBuffer + ); +} diff --git a/processing/src/main/java/io/druid/query/topn/Historical1SimpleDoubleAggPooledTopNScannerPrototype.java b/processing/src/main/java/io/druid/query/topn/Historical1SimpleDoubleAggPooledTopNScannerPrototype.java new file mode 100644 index 00000000000..e7d8aaf52a6 --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/Historical1SimpleDoubleAggPooledTopNScannerPrototype.java @@ -0,0 +1,74 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +import io.druid.query.aggregation.SimpleDoubleBufferAggregator; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.data.Offset; +import io.druid.segment.historical.HistoricalCursor; +import io.druid.segment.historical.HistoricalDimensionSelector; +import io.druid.segment.historical.HistoricalFloatColumnSelector; + +import java.nio.ByteBuffer; + +public class Historical1SimpleDoubleAggPooledTopNScannerPrototype + implements Historical1AggPooledTopNScanner< + HistoricalDimensionSelector, + HistoricalFloatColumnSelector, + SimpleDoubleBufferAggregator + > +{ + @Override + public long scanAndAggregate( + HistoricalDimensionSelector dimensionSelector, + HistoricalFloatColumnSelector metricSelector, + SimpleDoubleBufferAggregator aggregator, + int aggregatorSize, + HistoricalCursor cursor, + int[] positions, + ByteBuffer resultsBuffer + ) + { + // See TopNUtils.copyOffset() for explanation + Offset offset = (Offset) TopNUtils.copyOffset(cursor); + long scannedRows = 0; + int positionToAllocate = 0; + while (offset.withinBounds() && !Thread.currentThread().isInterrupted()) { + int rowNum = offset.getOffset(); + double metric = metricSelector.get(rowNum); + final IndexedInts dimValues = dimensionSelector.getRow(rowNum); + final int dimSize = dimValues.size(); + for (int i = 0; i < dimSize; i++) { + int dimIndex = dimValues.get(i); + int position = positions[dimIndex]; + if (position >= 0) { + aggregator.aggregate(resultsBuffer, position, metric); + } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { + positions[dimIndex] = positionToAllocate; + aggregator.putFirst(resultsBuffer, positionToAllocate, metric); + positionToAllocate += aggregatorSize; + } + } + scannedRows++; + offset.increment(); + } + return scannedRows; + } +} diff --git a/processing/src/main/java/io/druid/query/topn/HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype.java b/processing/src/main/java/io/druid/query/topn/HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype.java new file mode 100644 index 00000000000..91f77c4a809 --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype.java @@ -0,0 +1,68 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +import io.druid.query.aggregation.SimpleDoubleBufferAggregator; +import io.druid.segment.data.Offset; +import io.druid.segment.historical.HistoricalCursor; +import io.druid.segment.historical.HistoricalFloatColumnSelector; +import io.druid.segment.historical.SingleValueHistoricalDimensionSelector; + +import java.nio.ByteBuffer; + +public class HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype + implements Historical1AggPooledTopNScanner< + SingleValueHistoricalDimensionSelector, + HistoricalFloatColumnSelector, + SimpleDoubleBufferAggregator + > +{ + @Override + public long scanAndAggregate( + SingleValueHistoricalDimensionSelector dimensionSelector, + HistoricalFloatColumnSelector metricSelector, + SimpleDoubleBufferAggregator aggregator, + int aggregatorSize, + HistoricalCursor cursor, + int[] positions, + ByteBuffer resultsBuffer + ) + { + // See TopNUtils.copyOffset() for explanation + Offset offset = (Offset) TopNUtils.copyOffset(cursor); + long scannedRows = 0; + int positionToAllocate = 0; + while (offset.withinBounds() && !Thread.currentThread().isInterrupted()) { + int rowNum = offset.getOffset(); + int dimIndex = dimensionSelector.getRowValue(rowNum); + int position = positions[dimIndex]; + if (position >= 0) { + aggregator.aggregate(resultsBuffer, position, metricSelector.get(rowNum)); + } else if (position == TopNAlgorithm.INIT_POSITION_VALUE) { + positions[dimIndex] = positionToAllocate; + aggregator.putFirst(resultsBuffer, positionToAllocate, metricSelector.get(rowNum)); + positionToAllocate += aggregatorSize; + } + scannedRows++; + offset.increment(); + } + return scannedRows; + } +} diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index 7432e9c5ad6..4d5974e2e59 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -21,6 +21,7 @@ package io.druid.query.topn; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidPool; import io.druid.java.util.common.Pair; @@ -28,6 +29,7 @@ import io.druid.java.util.common.guava.CloseQuietly; import io.druid.query.BaseQuery; import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.SimpleDoubleBufferAggregator; import io.druid.query.monomorphicprocessing.SpecializationService; import io.druid.query.monomorphicprocessing.SpecializationState; import io.druid.query.monomorphicprocessing.StringRuntimeShape; @@ -36,29 +38,156 @@ import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; import io.druid.segment.column.ValueType; import io.druid.segment.data.IndexedInts; +import io.druid.segment.data.Offset; +import io.druid.segment.historical.HistoricalCursor; +import io.druid.segment.historical.HistoricalDimensionSelector; +import io.druid.segment.historical.HistoricalFloatColumnSelector; +import io.druid.segment.historical.SingleValueHistoricalDimensionSelector; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; /** */ public class PooledTopNAlgorithm extends BaseTopNAlgorithm { - /** Non-final fields for testing, see TopNQueryRunnerTest */ - @VisibleForTesting - static boolean specializeGeneric1AggPooledTopN = + private static boolean specializeGeneric1AggPooledTopN = !Boolean.getBoolean("dontSpecializeGeneric1AggPooledTopN"); - @VisibleForTesting - static boolean specializeGeneric2AggPooledTopN = + private static boolean specializeGeneric2AggPooledTopN = !Boolean.getBoolean("dontSpecializeGeneric2AggPooledTopN"); + private static boolean specializeHistorical1SimpleDoubleAggPooledTopN = + !Boolean.getBoolean("dontSpecializeHistorical1SimpleDoubleAggPooledTopN"); + private static boolean specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN = + !Boolean.getBoolean("dontSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN"); + + /** See TopNQueryRunnerTest */ + @VisibleForTesting + static void setSpecializeGeneric1AggPooledTopN(boolean value) + { + PooledTopNAlgorithm.specializeGeneric1AggPooledTopN = value; + computeSpecializedScanAndAggregateImplementations(); + } + + @VisibleForTesting + static void setSpecializeGeneric2AggPooledTopN(boolean value) + { + PooledTopNAlgorithm.specializeGeneric2AggPooledTopN = value; + computeSpecializedScanAndAggregateImplementations(); + } + + @VisibleForTesting + static void setSpecializeHistorical1SimpleDoubleAggPooledTopN(boolean value) + { + PooledTopNAlgorithm.specializeHistorical1SimpleDoubleAggPooledTopN = value; + computeSpecializedScanAndAggregateImplementations(); + } + + @VisibleForTesting + static void setSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN(boolean value) + { + PooledTopNAlgorithm.specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN = value; + computeSpecializedScanAndAggregateImplementations(); + } private static final Generic1AggPooledTopNScanner defaultGeneric1AggScanner = new Generic1AggPooledTopNScannerPrototype(); private static final Generic2AggPooledTopNScanner defaultGeneric2AggScanner = new Generic2AggPooledTopNScannerPrototype(); + private static final Historical1AggPooledTopNScanner defaultHistorical1SimpleDoubleAggScanner = + new Historical1SimpleDoubleAggPooledTopNScannerPrototype(); + private static final + Historical1AggPooledTopNScanner defaultHistoricalSingleValueDimSelector1SimpleDoubleAggScanner = + new HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype(); + + private interface ScanAndAggregate + { + /** + * If this implementation of ScanAndAggregate is executable with the given parameters, run it and return true. + * Otherwise return false (scanning and aggregation is not done). + */ + boolean scanAndAggregate( + final PooledTopNParams params, + final int[] positions, + final BufferAggregator[] theAggregators + ); + } + + private static final List specializedScanAndAggregateImplementations = new ArrayList<>(); + static { + computeSpecializedScanAndAggregateImplementations(); + } + + private static void computeSpecializedScanAndAggregateImplementations() + { + specializedScanAndAggregateImplementations.clear(); + // The order of the following `if` blocks matters, "more specialized" implementations go first + if (specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN) { + specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> { + if (theAggregators.length == 1) { + BufferAggregator aggregator = theAggregators[0]; + final Cursor cursor = params.getCursor(); + if (cursor instanceof HistoricalCursor && aggregator instanceof SimpleDoubleBufferAggregator) { + if (params.getDimSelector() instanceof SingleValueHistoricalDimensionSelector && + ((SimpleDoubleBufferAggregator) aggregator).getSelector() instanceof HistoricalFloatColumnSelector) { + scanAndAggregateHistorical1SimpleDoubleAgg( + params, + positions, + (SimpleDoubleBufferAggregator) aggregator, + (HistoricalCursor) cursor, + defaultHistoricalSingleValueDimSelector1SimpleDoubleAggScanner + ); + return true; + } + } + } + return false; + }); + } + if (specializeHistorical1SimpleDoubleAggPooledTopN) { + specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> { + if (theAggregators.length == 1) { + BufferAggregator aggregator = theAggregators[0]; + final Cursor cursor = params.getCursor(); + if (cursor instanceof HistoricalCursor && aggregator instanceof SimpleDoubleBufferAggregator) { + if (params.getDimSelector() instanceof HistoricalDimensionSelector && + ((SimpleDoubleBufferAggregator) aggregator).getSelector() instanceof HistoricalFloatColumnSelector) { + scanAndAggregateHistorical1SimpleDoubleAgg( + params, + positions, + (SimpleDoubleBufferAggregator) aggregator, + (HistoricalCursor) cursor, + defaultHistorical1SimpleDoubleAggScanner + ); + return true; + } + } + } + return false; + }); + } + if (specializeGeneric1AggPooledTopN) { + specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> { + if (theAggregators.length == 1) { + scanAndAggregateGeneric1Agg(params, positions, theAggregators[0], params.getCursor()); + return true; + } + return false; + }); + } + if (specializeGeneric2AggPooledTopN) { + specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> { + if (theAggregators.length == 2) { + scanAndAggregateGeneric2Agg(params, positions, theAggregators, params.getCursor()); + return true; + } + return false; + }); + } + } - private final Capabilities capabilities; private final TopNQuery query; private final StupidPool bufferPool; private static final int AGG_UNROLL_COUNT = 8; // Must be able to fit loop below @@ -70,8 +199,6 @@ public class PooledTopNAlgorithm ) { super(capabilities); - - this.capabilities = capabilities; this.query = query; this.bufferPool = bufferPool; } @@ -193,17 +320,45 @@ public class PooledTopNAlgorithm final int numProcessed ) { - final Cursor cursor = params.getCursor(); - if (specializeGeneric1AggPooledTopN && theAggregators.length == 1) { - scanAndAggregateGeneric1Agg(params, positions, theAggregators[0], cursor); - } else if (specializeGeneric2AggPooledTopN && theAggregators.length == 2) { - scanAndAggregateGeneric2Agg(params, positions, theAggregators, cursor); - } else { - scanAndAggregateDefault(params, positions, theAggregators); + for (ScanAndAggregate specializedScanAndAggregate : specializedScanAndAggregateImplementations) { + if (specializedScanAndAggregate.scanAndAggregate(params, positions, theAggregators)) { + BaseQuery.checkInterrupted(); + return; + } } + scanAndAggregateDefault(params, positions, theAggregators); BaseQuery.checkInterrupted(); } + private static void scanAndAggregateHistorical1SimpleDoubleAgg( + PooledTopNParams params, + int[] positions, + SimpleDoubleBufferAggregator aggregator, + HistoricalCursor cursor, + Historical1AggPooledTopNScanner prototypeScanner + ) + { + String runtimeShape = StringRuntimeShape.of(aggregator); + SpecializationState specializationState = + SpecializationService.getSpecializationState( + prototypeScanner.getClass(), + runtimeShape, + ImmutableMap.of(Offset.class, cursor.getOffset().getClass()) + ); + Historical1AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(prototypeScanner); + + long scannedRows = scanner.scanAndAggregate( + (HistoricalDimensionSelector) params.getDimSelector(), + aggregator.getSelector(), + aggregator, + params.getAggregatorSizes()[0], + cursor, + positions, + params.getResultsBuf() + ); + specializationState.accountLoopIterations(scannedRows); + } + private static void scanAndAggregateGeneric1Agg( PooledTopNParams params, int[] positions, diff --git a/processing/src/main/java/io/druid/query/topn/TopNUtils.java b/processing/src/main/java/io/druid/query/topn/TopNUtils.java new file mode 100644 index 00000000000..08afbcb3afd --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/TopNUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn; + +import io.druid.segment.historical.HistoricalCursor; + +final class TopNUtils +{ + /** + * Returns Object, so javac couldn't remove cast in methods like + * {@link HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype#scanAndAggregate}. That cast is + * needed, because when TopNScannerPrototype is specialized, occurrences of {@link io.druid.segment.data.Offset} are + * replaced with the specific Offset subtype in the TopNScannerPrototype bytecode, via {@link + * io.druid.query.monomorphicprocessing.SpecializationService#getSpecializationState(Class, String, + * com.google.common.collect.ImmutableMap)}, providing ImmutableMap.of(Offset.class, specificOffsetSubtype) as the + * classRemapping argument. + * + * Casting to the specific Offset subtype helps Hotspot JIT (OpenJDK 8) to generate better assembly. It shouldn't be + * so, because the Offset subtype is still always the same (otherwise cast wouldn't be possible), so JIT should + * generate equivalent code. In OpenJDK 9 Hotspot could be improved and this "casting hack" is not needed anymore. + */ + static Object copyOffset(HistoricalCursor cursor) + { + return cursor.getOffset().clone(); + } + + private TopNUtils() {} +} diff --git a/processing/src/main/java/io/druid/segment/BitmapOffset.java b/processing/src/main/java/io/druid/segment/BitmapOffset.java index 8b1d0297952..87f815d179d 100644 --- a/processing/src/main/java/io/druid/segment/BitmapOffset.java +++ b/processing/src/main/java/io/druid/segment/BitmapOffset.java @@ -36,7 +36,7 @@ import java.util.HashSet; /** */ -public class BitmapOffset implements Offset +public class BitmapOffset extends Offset { private static final int INVALID_VALUE = -1; private static final BitmapFactory ROARING_BITMAP_FACTORY = new RoaringBitmapSerdeFactory(false).getBitmapFactory(); diff --git a/processing/src/main/java/io/druid/segment/ColumnSelectorBitmapIndexSelector.java b/processing/src/main/java/io/druid/segment/ColumnSelectorBitmapIndexSelector.java index 23cfabf143a..e11d4435c91 100644 --- a/processing/src/main/java/io/druid/segment/ColumnSelectorBitmapIndexSelector.java +++ b/processing/src/main/java/io/druid/segment/ColumnSelectorBitmapIndexSelector.java @@ -24,6 +24,7 @@ import io.druid.collections.bitmap.BitmapFactory; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.collections.spatial.ImmutableRTree; import io.druid.query.filter.BitmapIndexSelector; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; @@ -99,6 +100,12 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector { return IndexedIterable.create(this).iterator(); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("column", column); + } }; } diff --git a/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java index 665802b819d..53cb38b1dde 100644 --- a/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedSupplier.java @@ -21,6 +21,7 @@ package io.druid.segment; import io.druid.java.util.common.IAE; import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier; import io.druid.segment.data.IndexedInts; @@ -216,6 +217,12 @@ public class CompressedVSizeIndexedSupplier implements WritableSupplier currentOffset && iter.hasNext()) { + iterOffset = iter.next(); + } + + return iterOffset == currentOffset; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("holder", holder); + inspector.visit("iter", iter); + } + }; + } else { + return new ValueMatcher() + { + int iterOffset = -1; + + @Override + public boolean matches() + { + int currentOffset = holder.getOffset().getOffset(); + while (iterOffset < currentOffset && iter.hasNext()) { + iterOffset = iter.next(); + } + + return iterOffset == currentOffset; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("holder", holder); + inspector.visit("iter", iter); + } + }; + } + } + } +} diff --git a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java index 36e0d2ce3ca..a7f5f75450c 100644 --- a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java @@ -120,6 +120,7 @@ public class FloatDimensionIndexer implements DimensionIndexer(Collections.singletonList(""), String.class); protected static final SerializerUtils serializerUtils = new SerializerUtils(); protected static final int INVALID_ROW = -1; - protected static final Splitter SPLITTER = Splitter.on(","); protected final ObjectMapper mapper; protected final IndexIO indexIO; @@ -1178,46 +1172,6 @@ public class IndexMerger } } - public static class AggFactoryStringIndexed implements Indexed - { - private final AggregatorFactory[] metricAggs; - - public AggFactoryStringIndexed(AggregatorFactory[] metricAggs) - { - this.metricAggs = metricAggs; - } - - @Override - public Class getClazz() - { - return String.class; - } - - @Override - public int size() - { - return metricAggs.length; - } - - @Override - public String get(int index) - { - return metricAggs[index].getName(); - } - - @Override - public int indexOf(String value) - { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() - { - return IndexedIterable.create(this).iterator(); - } - } - public static class RowboatMergeFunction implements BinaryFn { private final AggregatorFactory[] metricAggs; diff --git a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java index 963c03383a9..0b24837db1c 100644 --- a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java @@ -119,6 +119,7 @@ public class LongDimensionIndexer implements DimensionIndexer @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } @@ -150,6 +151,7 @@ public class LongDimensionIndexer implements DimensionIndexer @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/segment/NullDimensionSelector.java b/processing/src/main/java/io/druid/segment/NullDimensionSelector.java index 385bbad0c7d..ef7fd5838ab 100644 --- a/processing/src/main/java/io/druid/segment/NullDimensionSelector.java +++ b/processing/src/main/java/io/druid/segment/NullDimensionSelector.java @@ -26,10 +26,11 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.ZeroIndexedInts; import io.druid.segment.filter.BooleanValueMatcher; +import io.druid.segment.historical.SingleValueHistoricalDimensionSelector; import javax.annotation.Nullable; -public class NullDimensionSelector implements DimensionSelector, IdLookup +public class NullDimensionSelector implements SingleValueHistoricalDimensionSelector, IdLookup { private static final NullDimensionSelector INSTANCE = new NullDimensionSelector(); @@ -49,6 +50,24 @@ public class NullDimensionSelector implements DimensionSelector, IdLookup return ZeroIndexedInts.instance(); } + @Override + public int getRowValue() + { + return 0; + } + + @Override + public int getRowValue(int offset) + { + return 0; + } + + @Override + public IndexedInts getRow(int offset) + { + return getRow(); + } + @Override public ValueMatcher makeValueMatcher(String value) { @@ -96,5 +115,6 @@ public class NullDimensionSelector implements DimensionSelector, IdLookup @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java index 7fdbf4e985a..dad9a84b4ce 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java @@ -28,6 +28,7 @@ import io.druid.java.util.common.io.Closer; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.logger.Logger; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; @@ -165,6 +166,12 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter { return IndexedIterable.create(this).iterator(); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("dict", dict); + } }; } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index d3af3b9e9e0..f94aa788c65 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -20,24 +20,18 @@ package io.druid.segment; import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import io.druid.collections.bitmap.ImmutableBitmap; -import io.druid.java.util.common.io.Closer; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.io.Closer; import io.druid.query.BaseQuery; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; -import io.druid.query.filter.BooleanFilter; import io.druid.query.filter.Filter; -import io.druid.query.filter.RowOffsetMatcherFactory; -import io.druid.query.filter.ValueMatcher; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.Column; @@ -50,19 +44,16 @@ import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.Offset; import io.druid.segment.filter.AndFilter; -import io.druid.segment.filter.BooleanValueMatcher; -import it.unimi.dsi.fastutil.ints.IntIterators; +import io.druid.segment.historical.HistoricalCursor; +import io.druid.segment.historical.HistoricalFloatColumnSelector; import org.joda.time.DateTime; import org.joda.time.Interval; -import org.roaringbitmap.IntIterator; -import javax.annotation.Nullable; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; -import java.util.BitSet; import java.util.List; import java.util.Map; +import java.util.Objects; /** */ @@ -305,7 +296,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter postFilter, selector ).build(), - Predicates.notNull() + Objects::nonNull ); } @@ -320,7 +311,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter private static class CursorSequenceBuilder { - private final StorageAdapter storageAdapter; private final QueryableIndex index; private final Interval interval; private final VirtualColumns virtualColumns; @@ -345,7 +335,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter ColumnSelectorBitmapIndexSelector bitmapIndexSelector ) { - this.storageAdapter = storageAdapter; this.index = storageAdapter.index; this.interval = interval; this.virtualColumns = virtualColumns; @@ -418,11 +407,44 @@ public class QueryableIndexStorageAdapter implements StorageAdapter final Offset initOffset = offset.clone(); final DateTime myBucket = gran.toDateTime(inputInterval.getStartMillis()); - final CursorOffsetHolder cursorOffsetHolder = new CursorOffsetHolder(); - abstract class QueryableIndexBaseCursor implements Cursor + abstract class QueryableIndexBaseCursor implements HistoricalCursor { - Offset cursorOffset; + OffsetType cursorOffset; + + @Override + public OffsetType getOffset() + { + return cursorOffset; + } + + @Override + public DateTime getTime() + { + return myBucket; + } + + @Override + public void advanceTo(int offset) + { + int count = 0; + while (count < offset && !isDone()) { + advance(); + count++; + } + } + + @Override + public boolean isDone() + { + return !cursorOffset.withinBounds(); + } + + @Override + public boolean isDoneOrInterrupted() + { + return isDone() || Thread.currentThread().isInterrupted(); + } @Override public DimensionSelector makeDimensionSelector( @@ -472,204 +494,13 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } final DictionaryEncodedColumn column = cachedColumn; - - abstract class QueryableDimensionSelector implements DimensionSelector, IdLookup - { - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("column", column); - inspector.visit("cursorOffset", cursorOffset); - inspector.visit("extractionFn", extractionFn); - } - } if (column == null) { return NullDimensionSelector.instance(); - } else if (columnDesc.getCapabilities().hasMultipleValues()) { - class MultiValueDimensionSelector extends QueryableDimensionSelector - { - @Override - public IndexedInts getRow() - { - return column.getMultiValueRow(cursorOffset.getOffset()); - } - - @Override - public ValueMatcher makeValueMatcher(String value) - { - return DimensionSelectorUtils.makeValueMatcherGeneric(this, value); - } - - @Override - public ValueMatcher makeValueMatcher(Predicate predicate) - { - return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate); - } - - @Override - public int getValueCardinality() - { - return column.getCardinality(); - } - - @Override - public String lookupName(int id) - { - final String value = column.lookupName(id); - return extractionFn == null ? - value : - extractionFn.apply(value); - } - - @Override - public boolean nameLookupPossibleInAdvance() - { - return true; - } - - @Nullable - @Override - public IdLookup idLookup() - { - return extractionFn == null ? this : null; - } - - @Override - public int lookupId(String name) - { - if (extractionFn != null) { - throw new UnsupportedOperationException( - "cannot perform lookup when applying an extraction function" - ); - } - return column.lookupId(name); - } - } - return new MultiValueDimensionSelector(); } else { - class SingleValueDimensionSelector extends QueryableDimensionSelector - { - @Override - public IndexedInts getRow() - { - // using an anonymous class is faster than creating a class that stores a copy of the value - return new IndexedInts() - { - @Override - public int size() - { - return 1; - } - - @Override - public int get(int index) - { - return column.getSingleValueRow(cursorOffset.getOffset()); - } - - @Override - public it.unimi.dsi.fastutil.ints.IntIterator iterator() - { - return IntIterators.singleton(column.getSingleValueRow(cursorOffset.getOffset())); - } - - @Override - public void fill(int index, int[] toFill) - { - throw new UnsupportedOperationException("fill not supported"); - } - - @Override - public void close() throws IOException - { - - } - }; - } - - @Override - public ValueMatcher makeValueMatcher(final String value) - { - if (extractionFn == null) { - final int valueId = lookupId(value); - if (valueId >= 0) { - return new ValueMatcher() - { - @Override - public boolean matches() - { - return column.getSingleValueRow(cursorOffset.getOffset()) == valueId; - } - }; - } else { - return BooleanValueMatcher.of(false); - } - } else { - // Employ precomputed BitSet optimization - return makeValueMatcher(Predicates.equalTo(value)); - } - } - - @Override - public ValueMatcher makeValueMatcher(final Predicate predicate) - { - final BitSet predicateMatchingValueIds = DimensionSelectorUtils.makePredicateMatchingSet( - this, - predicate - ); - return new ValueMatcher() - { - @Override - public boolean matches() - { - int rowValueId = column.getSingleValueRow(cursorOffset.getOffset()); - return predicateMatchingValueIds.get(rowValueId); - } - }; - } - - @Override - public int getValueCardinality() - { - return column.getCardinality(); - } - - @Override - public String lookupName(int id) - { - final String value = column.lookupName(id); - return extractionFn == null ? value : extractionFn.apply(value); - } - - @Override - public boolean nameLookupPossibleInAdvance() - { - return true; - } - - @Nullable - @Override - public IdLookup idLookup() - { - return extractionFn == null ? this : null; - } - - @Override - public int lookupId(String name) - { - if (extractionFn != null) { - throw new UnsupportedOperationException( - "cannot perform lookup when applying an extraction function" - ); - } - return column.lookupId(name); - } - } - return new SingleValueDimensionSelector(); + return column.makeDimensionSelector(this, extractionFn); } } - @Override public FloatColumnSelector makeFloatColumnSelector(String columnName) { @@ -694,7 +525,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } final GenericColumn metricVals = cachedMetricVals; - return new FloatColumnSelector() + return new HistoricalFloatColumnSelector() { @Override public float get() @@ -702,6 +533,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter return metricVals.getFloatSingleValueRow(cursorOffset.getOffset()); } + @Override + public float get(int offset) + { + return metricVals.getFloatSingleValueRow(offset); + } + @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { @@ -922,18 +759,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } if (postFilter == null) { - return new QueryableIndexBaseCursor() + return new QueryableIndexBaseCursor() { { reset(); } - @Override - public DateTime getTime() - { - return myBucket; - } - @Override public void advance() { @@ -947,139 +778,39 @@ public class QueryableIndexStorageAdapter implements StorageAdapter cursorOffset.increment(); } - @Override - public void advanceTo(int offset) - { - int count = 0; - while (count < offset && !isDone()) { - advance(); - count++; - } - } - - @Override - public boolean isDone() - { - return !cursorOffset.withinBounds(); - } - - @Override - public boolean isDoneOrInterrupted() - { - return isDone() || Thread.currentThread().isInterrupted(); - } - @Override public void reset() { cursorOffset = initOffset.clone(); - cursorOffsetHolder.set(cursorOffset); } }; } else { - return new QueryableIndexBaseCursor() + return new QueryableIndexBaseCursor() { - RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory( - cursorOffsetHolder, - descending - ); - - final ValueMatcher filterMatcher; - - { - if (postFilter instanceof BooleanFilter) { - filterMatcher = ((BooleanFilter) postFilter).makeMatcher( - bitmapIndexSelector, - this, - rowOffsetMatcherFactory - ); - } else { - if (postFilter.supportsBitmapIndex(bitmapIndexSelector)) { - filterMatcher = rowOffsetMatcherFactory.makeRowOffsetMatcher(postFilter.getBitmapIndex( - bitmapIndexSelector)); - } else { - filterMatcher = postFilter.makeMatcher(this); - } - } - } - { + cursorOffset = new FilteredOffset(this, descending, postFilter, bitmapIndexSelector); reset(); } - @Override - public DateTime getTime() - { - return myBucket; - } - @Override public void advance() { BaseQuery.checkInterrupted(); - cursorOffset.increment(); - - while (!isDone()) { - BaseQuery.checkInterrupted(); - if (filterMatcher.matches()) { - return; - } else { - cursorOffset.increment(); - } - } + cursorOffset.incrementInterruptibly(); } @Override public void advanceUninterruptibly() { - if (Thread.currentThread().isInterrupted()) { - return; + if (!Thread.currentThread().isInterrupted()) { + cursorOffset.increment(); } - cursorOffset.increment(); - - while (!isDoneOrInterrupted()) { - if (filterMatcher.matches()) { - return; - } else { - cursorOffset.increment(); - } - } - } - - @Override - public void advanceTo(int offset) - { - int count = 0; - while (count < offset && !isDone()) { - advance(); - count++; - } - } - - @Override - public boolean isDone() - { - return !cursorOffset.withinBounds(); - } - - @Override - public boolean isDoneOrInterrupted() - { - return isDone() || Thread.currentThread().isInterrupted(); } @Override public void reset() { - cursorOffset = initOffset.clone(); - cursorOffsetHolder.set(cursorOffset); - if (!isDone()) { - if (filterMatcher.matches()) { - return; - } else { - advance(); - } - } + cursorOffset.reset(initOffset.clone()); } }; } @@ -1092,83 +823,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } - public static class CursorOffsetHolder - { - Offset currOffset = null; - - public Offset get() - { - return currOffset; - } - - public void set(Offset currOffset) - { - this.currOffset = currOffset; - } - } - - private static class CursorOffsetHolderRowOffsetMatcherFactory implements RowOffsetMatcherFactory - { - private final CursorOffsetHolder holder; - private final boolean descending; - - public CursorOffsetHolderRowOffsetMatcherFactory(CursorOffsetHolder holder, boolean descending) - { - this.holder = holder; - this.descending = descending; - } - - // Use an iterator-based implementation, ImmutableBitmap.get(index) works differently for Concise and Roaring. - // ImmutableConciseSet.get(index) is also inefficient, it performs a linear scan on each call - @Override - public ValueMatcher makeRowOffsetMatcher(final ImmutableBitmap rowBitmap) - { - final IntIterator iter = descending ? - BitmapOffset.getReverseBitmapOffsetIterator(rowBitmap) : - rowBitmap.iterator(); - - if (!iter.hasNext()) { - return BooleanValueMatcher.of(false); - } - - if (descending) { - return new ValueMatcher() - { - int iterOffset = Integer.MAX_VALUE; - - @Override - public boolean matches() - { - int currentOffset = holder.get().getOffset(); - while (iterOffset > currentOffset && iter.hasNext()) { - iterOffset = iter.next(); - } - - return iterOffset == currentOffset; - } - }; - } else { - return new ValueMatcher() - { - int iterOffset = -1; - - @Override - public boolean matches() - { - int currentOffset = holder.get().getOffset(); - while (iterOffset < currentOffset && iter.hasNext()) { - iterOffset = iter.next(); - } - - return iterOffset == currentOffset; - } - }; - } - } - } - - - private abstract static class TimestampCheckingOffset implements Offset + public abstract static class TimestampCheckingOffset extends Offset { protected final Offset baseOffset; protected final GenericColumn timestamps; @@ -1230,7 +885,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } - private static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset + public static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset { public AscendingTimestampCheckingOffset( Offset baseOffset, @@ -1262,7 +917,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } - private static class DescendingTimestampCheckingOffset extends TimestampCheckingOffset + public static class DescendingTimestampCheckingOffset extends TimestampCheckingOffset { public DescendingTimestampCheckingOffset( Offset baseOffset, @@ -1295,7 +950,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } - private static class NoFilterOffset implements Offset + public static class NoFilterOffset extends Offset { private final int rowCount; private final boolean descending; diff --git a/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java b/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java index 9e4f296b210..f7e41d57296 100644 --- a/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java +++ b/processing/src/main/java/io/druid/segment/SingleScanTimeDimSelector.java @@ -31,7 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -public class SingleScanTimeDimSelector implements DimensionSelector +public class SingleScanTimeDimSelector implements SingleValueDimensionSelector { private final ExtractionFn extractionFn; private final LongColumnSelector selector; @@ -64,6 +64,12 @@ public class SingleScanTimeDimSelector implements DimensionSelector return new SingleIndexedInt(getDimensionValueIndex()); } + @Override + public int getRowValue() + { + return getDimensionValueIndex(); + } + @Override public ValueMatcher makeValueMatcher(final String value) { @@ -74,6 +80,12 @@ public class SingleScanTimeDimSelector implements DimensionSelector { return Objects.equals(lookupName(getDimensionValueIndex()), value); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", SingleScanTimeDimSelector.this); + } }; } @@ -87,6 +99,13 @@ public class SingleScanTimeDimSelector implements DimensionSelector { return predicate.apply(lookupName(getDimensionValueIndex())); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", SingleScanTimeDimSelector.this); + inspector.visit("predicate", predicate); + } }; } diff --git a/processing/src/main/java/io/druid/segment/data/ListBasedIndexedInts.java b/processing/src/main/java/io/druid/segment/SingleValueDimensionSelector.java similarity index 52% rename from processing/src/main/java/io/druid/segment/data/ListBasedIndexedInts.java rename to processing/src/main/java/io/druid/segment/SingleValueDimensionSelector.java index a76c5f2fe51..39f5730b78f 100644 --- a/processing/src/main/java/io/druid/segment/data/ListBasedIndexedInts.java +++ b/processing/src/main/java/io/druid/segment/SingleValueDimensionSelector.java @@ -17,48 +17,18 @@ * under the License. */ -package io.druid.segment.data; +package io.druid.segment; -import it.unimi.dsi.fastutil.ints.IntIterator; - -import java.io.IOException; -import java.util.List; +import io.druid.query.monomorphicprocessing.CalledFromHotLoop; /** + * Specialization for {@link DimensionSelector}s, always having a single value in {@link #getRow()}. */ -public class ListBasedIndexedInts implements IndexedInts +public interface SingleValueDimensionSelector extends DimensionSelector { - private final List expansion; - - public ListBasedIndexedInts(List expansion) {this.expansion = expansion;} - - @Override - public int size() - { - return expansion.size(); - } - - @Override - public int get(int index) - { - return expansion.get(index); - } - - @Override - public IntIterator iterator() - { - return new IndexedIntsIterator(this); - } - - @Override - public void fill(int index, int[] toFill) - { - throw new UnsupportedOperationException("fill not supported"); - } - - @Override - public void close() throws IOException - { - - } + /** + * Returns a single value of {@link #getRow()}. + */ + @CalledFromHotLoop + int getRowValue(); } diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index 830547369ad..2fb9b31d318 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -327,6 +327,12 @@ public class StringDimensionIndexer implements DimensionIndexer { private static final Logger log = new Logger(StringDimensionMergerV9.class); - protected static final ListIndexed EMPTY_STR_DIM_VAL = new ListIndexed<>(Collections.singletonList(""), String.class); + protected static final Indexed EMPTY_STR_DIM_VAL = new ArrayIndexed<>(new String[]{""}, String.class); protected static final int[] EMPTY_STR_DIM_ARRAY = new int[]{0}; protected static final Splitter SPLITTER = Splitter.on(","); diff --git a/processing/src/main/java/io/druid/segment/ZeroFloatColumnSelector.java b/processing/src/main/java/io/druid/segment/ZeroFloatColumnSelector.java index 295ecd23539..ffcd67e83fa 100644 --- a/processing/src/main/java/io/druid/segment/ZeroFloatColumnSelector.java +++ b/processing/src/main/java/io/druid/segment/ZeroFloatColumnSelector.java @@ -20,8 +20,9 @@ package io.druid.segment; import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.historical.HistoricalFloatColumnSelector; -public final class ZeroFloatColumnSelector implements FloatColumnSelector +public final class ZeroFloatColumnSelector implements HistoricalFloatColumnSelector { private static final ZeroFloatColumnSelector INSTANCE = new ZeroFloatColumnSelector(); @@ -41,8 +42,15 @@ public final class ZeroFloatColumnSelector implements FloatColumnSelector return 0.0f; } + @Override + public float get(int offset) + { + return get(); + } + @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/segment/ZeroLongColumnSelector.java b/processing/src/main/java/io/druid/segment/ZeroLongColumnSelector.java index fbf765fd93e..71cb8128ce1 100644 --- a/processing/src/main/java/io/druid/segment/ZeroLongColumnSelector.java +++ b/processing/src/main/java/io/druid/segment/ZeroLongColumnSelector.java @@ -44,5 +44,6 @@ public final class ZeroLongColumnSelector implements LongColumnSelector @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/segment/column/DictionaryEncodedColumn.java b/processing/src/main/java/io/druid/segment/column/DictionaryEncodedColumn.java index 22d19b606f9..0a6cd57c8c2 100644 --- a/processing/src/main/java/io/druid/segment/column/DictionaryEncodedColumn.java +++ b/processing/src/main/java/io/druid/segment/column/DictionaryEncodedColumn.java @@ -19,7 +19,10 @@ package io.druid.segment.column; +import io.druid.query.extraction.ExtractionFn; +import io.druid.segment.DimensionSelector; import io.druid.segment.data.IndexedInts; +import io.druid.segment.historical.OffsetHolder; import java.io.Closeable; @@ -34,4 +37,6 @@ public interface DictionaryEncodedColumn extends public ActualType lookupName(int id); public int lookupId(ActualType name); public int getCardinality(); + + DimensionSelector makeDimensionSelector(OffsetHolder offsetHolder, ExtractionFn extractionFn); } diff --git a/processing/src/main/java/io/druid/segment/column/SimpleDictionaryEncodedColumn.java b/processing/src/main/java/io/druid/segment/column/SimpleDictionaryEncodedColumn.java index 35fb8d03e4c..754282cc285 100644 --- a/processing/src/main/java/io/druid/segment/column/SimpleDictionaryEncodedColumn.java +++ b/processing/src/main/java/io/druid/segment/column/SimpleDictionaryEncodedColumn.java @@ -19,13 +19,27 @@ package io.druid.segment.column; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.base.Strings; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.query.extraction.ExtractionFn; +import io.druid.query.filter.ValueMatcher; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import io.druid.segment.DimensionSelectorUtils; +import io.druid.segment.IdLookup; import io.druid.segment.data.CachingIndexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedMultivalue; +import io.druid.segment.data.SingleIndexedInt; +import io.druid.segment.filter.BooleanValueMatcher; +import io.druid.segment.historical.HistoricalDimensionSelector; +import io.druid.segment.historical.OffsetHolder; +import io.druid.segment.historical.SingleValueHistoricalDimensionSelector; +import javax.annotation.Nullable; import java.io.IOException; +import java.util.BitSet; /** */ @@ -90,6 +104,184 @@ public class SimpleDictionaryEncodedColumn return cachedLookups.size(); } + @Override + public HistoricalDimensionSelector makeDimensionSelector( + final OffsetHolder offsetHolder, + final ExtractionFn extractionFn + ) + { + abstract class QueryableDimensionSelector implements HistoricalDimensionSelector, IdLookup + { + @Override + public int getValueCardinality() + { + return getCardinality(); + } + + @Override + public String lookupName(int id) + { + final String value = SimpleDictionaryEncodedColumn.this.lookupName(id); + return extractionFn == null ? + value : + extractionFn.apply(value); + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return true; + } + + @Nullable + @Override + public IdLookup idLookup() + { + return extractionFn == null ? this : null; + } + + @Override + public int lookupId(String name) + { + if (extractionFn != null) { + throw new UnsupportedOperationException( + "cannot perform lookup when applying an extraction function" + ); + } + return SimpleDictionaryEncodedColumn.this.lookupId(name); + } + } + + if (hasMultipleValues()) { + class MultiValueDimensionSelector extends QueryableDimensionSelector + { + @Override + public IndexedInts getRow() + { + return multiValueColumn.get(offsetHolder.getOffset().getOffset()); + } + + @Override + public IndexedInts getRow(int offset) + { + return multiValueColumn.get(offset); + } + + @Override + public ValueMatcher makeValueMatcher(String value) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, value); + } + + @Override + public ValueMatcher makeValueMatcher(Predicate predicate) + { + return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("multiValueColumn", multiValueColumn); + inspector.visit("offsetHolder", offsetHolder); + inspector.visit("offset", offsetHolder.getOffset()); + inspector.visit("extractionFn", extractionFn); + } + } + return new MultiValueDimensionSelector(); + } else { + class SingleValueQueryableDimensionSelector extends QueryableDimensionSelector + implements SingleValueHistoricalDimensionSelector + { + @Override + public IndexedInts getRow() + { + return new SingleIndexedInt(getRowValue()); + } + + @Override + public int getRowValue() + { + return column.get(offsetHolder.getOffset().getOffset()); + } + + @Override + public IndexedInts getRow(int offset) + { + return new SingleIndexedInt(getRowValue(offset)); + } + + @Override + public int getRowValue(int offset) + { + return column.get(offset); + } + + @Override + public ValueMatcher makeValueMatcher(final String value) + { + if (extractionFn == null) { + final int valueId = lookupId(value); + if (valueId >= 0) { + return new ValueMatcher() + { + @Override + public boolean matches() + { + return getRowValue() == valueId; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("column", SimpleDictionaryEncodedColumn.this); + } + }; + } else { + return BooleanValueMatcher.of(false); + } + } else { + // Employ precomputed BitSet optimization + return makeValueMatcher(Predicates.equalTo(value)); + } + } + + @Override + public ValueMatcher makeValueMatcher(final Predicate predicate) + { + final BitSet predicateMatchingValueIds = DimensionSelectorUtils.makePredicateMatchingSet( + this, + predicate + ); + return new ValueMatcher() + { + @Override + public boolean matches() + { + return predicateMatchingValueIds.get(getRowValue()); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("column", SimpleDictionaryEncodedColumn.this); + } + }; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("column", column); + inspector.visit("offsetHolder", offsetHolder); + inspector.visit("offset", offsetHolder.getOffset()); + inspector.visit("extractionFn", extractionFn); + } + } + return new SingleValueQueryableDimensionSelector(); + } + } + @Override public void close() throws IOException { diff --git a/processing/src/main/java/io/druid/segment/data/ArrayBasedIndexedInts.java b/processing/src/main/java/io/druid/segment/data/ArrayBasedIndexedInts.java index 71ad4025d0a..f78ba2fca54 100644 --- a/processing/src/main/java/io/druid/segment/data/ArrayBasedIndexedInts.java +++ b/processing/src/main/java/io/druid/segment/data/ArrayBasedIndexedInts.java @@ -20,6 +20,7 @@ package io.druid.segment.data; import io.druid.java.util.common.IAE; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import it.unimi.dsi.fastutil.ints.IntArrays; import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntIterators; @@ -90,6 +91,11 @@ public final class ArrayBasedIndexedInts implements IndexedInts @Override public void close() throws IOException { + } + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/segment/data/ArrayBasedOffset.java b/processing/src/main/java/io/druid/segment/data/ArrayBasedOffset.java index 41385ec77c7..90ce1da5dbe 100644 --- a/processing/src/main/java/io/druid/segment/data/ArrayBasedOffset.java +++ b/processing/src/main/java/io/druid/segment/data/ArrayBasedOffset.java @@ -23,7 +23,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; /** */ -public class ArrayBasedOffset implements Offset +public class ArrayBasedOffset extends Offset { private final int[] ints; private int currIndex; @@ -73,5 +73,6 @@ public class ArrayBasedOffset implements Offset @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/segment/data/ArrayIndexed.java b/processing/src/main/java/io/druid/segment/data/ArrayIndexed.java index 81a594e2f56..5cbc4bcf0a8 100644 --- a/processing/src/main/java/io/druid/segment/data/ArrayIndexed.java +++ b/processing/src/main/java/io/druid/segment/data/ArrayIndexed.java @@ -19,6 +19,8 @@ package io.druid.segment.data; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; + import java.util.Arrays; import java.util.Iterator; @@ -67,4 +69,10 @@ public class ArrayIndexed implements Indexed { return Arrays.asList(baseArray).iterator(); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // nothing to inspect + } } diff --git a/processing/src/main/java/io/druid/segment/data/BitmapCompressedIndexedInts.java b/processing/src/main/java/io/druid/segment/data/BitmapCompressedIndexedInts.java index 2c78d39429c..a1f6346dedc 100644 --- a/processing/src/main/java/io/druid/segment/data/BitmapCompressedIndexedInts.java +++ b/processing/src/main/java/io/druid/segment/data/BitmapCompressedIndexedInts.java @@ -21,6 +21,7 @@ package io.druid.segment.data; import com.google.common.collect.Ordering; import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.IntIteratorUtils; import it.unimi.dsi.fastutil.ints.IntIterator; @@ -96,6 +97,11 @@ public class BitmapCompressedIndexedInts implements IndexedInts, Comparable implements Indexed, Closeable @Override public T get(int index) { - if(cachedValues != null) { + if (cachedValues != null) { final T cached = cachedValues.getValue(index); if (cached != null) { return cached; @@ -108,7 +109,14 @@ public class CachingIndexed implements Indexed, Closeable } } -private static class SizedLRUMap extends LinkedHashMap> + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("cachedValues", cachedValues != null); + inspector.visit("delegate", delegate); + } + + private static class SizedLRUMap extends LinkedHashMap> { private final int maxBytes; private int numBytes = 0; diff --git a/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java index d77de9865f6..3978dcb8ce8 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java @@ -27,6 +27,7 @@ import io.druid.collections.StupidResourceHolder; import io.druid.java.util.common.IAE; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.CompressedPools; import it.unimi.dsi.fastutil.ints.IntIterator; @@ -367,5 +368,14 @@ public class CompressedIntsIndexedSupplier implements WritableSupplier> getBaseBuffers() { return baseBuffers; @@ -411,5 +411,14 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier implements Indexed private int logBaseTwoOfElementsPerValueFile; private int relativeIndexMask; - private ByteBuffer theBuffer; + private final ByteBuffer theBuffer; /** * Constructor for version one. @@ -210,6 +211,7 @@ public class GenericIndexed implements Indexed { this.versionOne = false; + this.theBuffer = null; this.strategy = strategy; this.allowReverseLookup = allowReverseLookup; this.valueBuffers = valueBuffs; @@ -347,6 +349,21 @@ public class GenericIndexed implements Indexed return strategy.fromByteBuffer(copyValueBuffer, size); } + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("versionOne", versionOne); + inspector.visit("headerBuffer", headerBuffer); + if (versionOne) { + inspector.visit("firstValueBuffer", firstValueBuffer); + } else { + // Inspecting just one example of valueBuffer, not needed to inspect the whole array, because all buffers in it + // are the same. + inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] : null); + } + inspector.visit("strategy", strategy); + } + abstract class BufferIndexed implements Indexed { int lastReadSize; @@ -525,6 +542,14 @@ public class GenericIndexed implements Indexed } return bufferedIndexedGet(copyBuffer, startOffset, endOffset); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("headerBuffer", headerBuffer); + inspector.visit("copyBuffer", copyBuffer); + inspector.visit("strategy", strategy); + } }; } @@ -633,6 +658,16 @@ public class GenericIndexed implements Indexed int fileNum = index >> logBaseTwoOfElementsPerValueFile; return bufferedIndexedGet(copyValueBuffers[fileNum], startOffset, endOffset); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("headerBuffer", headerBuffer); + // Inspecting just one example of copyValueBuffer, not needed to inspect the whole array, because all buffers + // in it are the same. + inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? copyValueBuffers[0] : null); + inspector.visit("strategy", strategy); + } }; } } diff --git a/processing/src/main/java/io/druid/segment/data/Indexed.java b/processing/src/main/java/io/druid/segment/data/Indexed.java index d64ccf81f3f..1f6ee2ebf1c 100644 --- a/processing/src/main/java/io/druid/segment/data/Indexed.java +++ b/processing/src/main/java/io/druid/segment/data/Indexed.java @@ -19,12 +19,16 @@ package io.druid.segment.data; -public interface Indexed extends Iterable +import io.druid.query.monomorphicprocessing.CalledFromHotLoop; +import io.druid.query.monomorphicprocessing.HotLoopCallee; + +public interface Indexed extends Iterable, HotLoopCallee { Class getClazz(); int size(); + @CalledFromHotLoop T get(int index); /** diff --git a/processing/src/main/java/io/druid/segment/data/IndexedInts.java b/processing/src/main/java/io/druid/segment/data/IndexedInts.java index 2e097b24618..63af05f0169 100644 --- a/processing/src/main/java/io/druid/segment/data/IndexedInts.java +++ b/processing/src/main/java/io/druid/segment/data/IndexedInts.java @@ -19,6 +19,8 @@ package io.druid.segment.data; +import io.druid.query.monomorphicprocessing.CalledFromHotLoop; +import io.druid.query.monomorphicprocessing.HotLoopCallee; import it.unimi.dsi.fastutil.ints.IntIterable; import java.io.Closeable; @@ -26,9 +28,11 @@ import java.io.Closeable; /** * Get a int an index (array or list lookup abstraction without boxing). */ -public interface IndexedInts extends IntIterable, Closeable +public interface IndexedInts extends IntIterable, Closeable, HotLoopCallee { + @CalledFromHotLoop int size(); + @CalledFromHotLoop int get(int index); void fill(int index, int[] toFill); } diff --git a/processing/src/main/java/io/druid/segment/data/IndexedMultivalue.java b/processing/src/main/java/io/druid/segment/data/IndexedMultivalue.java index 1303396e713..1c3906ee148 100644 --- a/processing/src/main/java/io/druid/segment/data/IndexedMultivalue.java +++ b/processing/src/main/java/io/druid/segment/data/IndexedMultivalue.java @@ -23,4 +23,6 @@ import java.io.Closeable; public interface IndexedMultivalue extends Indexed, Closeable { + @Override + T get(int index); } diff --git a/processing/src/main/java/io/druid/segment/data/IntersectingOffset.java b/processing/src/main/java/io/druid/segment/data/IntersectingOffset.java index f3da84d0a1e..94e4cdc3792 100644 --- a/processing/src/main/java/io/druid/segment/data/IntersectingOffset.java +++ b/processing/src/main/java/io/druid/segment/data/IntersectingOffset.java @@ -23,7 +23,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; /** */ -public class IntersectingOffset implements Offset { +public class IntersectingOffset extends Offset { private final Offset lhs; private final Offset rhs; diff --git a/processing/src/main/java/io/druid/segment/data/ListIndexed.java b/processing/src/main/java/io/druid/segment/data/ListIndexed.java index feffc0d63a1..b06b7ef7c1e 100644 --- a/processing/src/main/java/io/druid/segment/data/ListIndexed.java +++ b/processing/src/main/java/io/druid/segment/data/ListIndexed.java @@ -19,6 +19,8 @@ package io.druid.segment.data; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; + import java.util.Iterator; import java.util.List; @@ -67,4 +69,10 @@ public class ListIndexed implements Indexed { return baseList.iterator(); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("baseList", baseList); + } } diff --git a/processing/src/main/java/io/druid/segment/data/Offset.java b/processing/src/main/java/io/druid/segment/data/Offset.java index 7e44dd7b8e7..7eff391e65d 100644 --- a/processing/src/main/java/io/druid/segment/data/Offset.java +++ b/processing/src/main/java/io/druid/segment/data/Offset.java @@ -19,19 +19,35 @@ package io.druid.segment.data; +import io.druid.annotations.SubclassesMustBePublic; import io.druid.query.monomorphicprocessing.CalledFromHotLoop; /** * The "mutable" version of a ReadableOffset. Introduces "increment()" and "withinBounds()" methods, which are * very similar to "next()" and "hasNext()" on the Iterator interface except increment() does not return a value. + * + * Annotated with {@link SubclassesMustBePublic} because Offset occurrences are replaced with a subclass in {@link + * io.druid.query.topn.Historical1SimpleDoubleAggPooledTopNScannerPrototype} and {@link + * io.druid.query.topn.HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype} during + * specialization, and specialized version of those prototypes must be able to any subclass of Offset. */ -public interface Offset extends ReadableOffset +@SubclassesMustBePublic +public abstract class Offset implements ReadableOffset, Cloneable { @CalledFromHotLoop - void increment(); + public abstract void increment(); @CalledFromHotLoop - boolean withinBounds(); + public abstract boolean withinBounds(); - Offset clone(); + @Override + public Offset clone() + { + try { + return (Offset) super.clone(); + } + catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } } diff --git a/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java b/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java index 172c74e307f..0f426f32ec7 100644 --- a/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java +++ b/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java @@ -20,6 +20,7 @@ package io.druid.segment.data; import com.google.common.base.Preconditions; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntIterators; @@ -86,6 +87,11 @@ public class RangeIndexedInts implements IndexedInts @Override public void close() throws IOException { + } + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/segment/data/SingleIndexedInt.java b/processing/src/main/java/io/druid/segment/data/SingleIndexedInt.java index 2799cf9b31a..0d7eaac4b80 100644 --- a/processing/src/main/java/io/druid/segment/data/SingleIndexedInt.java +++ b/processing/src/main/java/io/druid/segment/data/SingleIndexedInt.java @@ -19,6 +19,7 @@ package io.druid.segment.data; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntIterators; @@ -64,4 +65,10 @@ public final class SingleIndexedInt implements IndexedInts public void close() throws IOException { } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // nothing to inspect + } } diff --git a/processing/src/main/java/io/druid/segment/data/UnioningOffset.java b/processing/src/main/java/io/druid/segment/data/UnioningOffset.java index 05be50016b5..225a18961f1 100644 --- a/processing/src/main/java/io/druid/segment/data/UnioningOffset.java +++ b/processing/src/main/java/io/druid/segment/data/UnioningOffset.java @@ -23,7 +23,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; /** */ -public class UnioningOffset implements Offset +public class UnioningOffset extends Offset { private final Offset[] offsets = new Offset[2]; private final int[] offsetVals = new int[2]; diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexed.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexed.java index 873c5675216..1f7eb057b74 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeIndexed.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexed.java @@ -24,6 +24,7 @@ import io.druid.common.utils.SerializerUtils; import io.druid.io.ZeroCopyByteArrayOutputStream; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import java.io.IOException; import java.nio.ByteBuffer; @@ -190,6 +191,12 @@ public class VSizeIndexed implements IndexedMultivalue // no-op } + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("theBuffer", theBuffer); + } + public WritableSupplier> asWritableSupplier() { return new VSizeIndexedSupplier(this); } diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexedInts.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexedInts.java index 95b72122aeb..a35c58813f5 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeIndexedInts.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexedInts.java @@ -22,6 +22,7 @@ package io.druid.segment.data; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import io.druid.java.util.common.IAE; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import it.unimi.dsi.fastutil.ints.IntIterator; import java.io.IOException; @@ -220,7 +221,12 @@ public class VSizeIndexedInts implements IndexedInts, Comparable asWritableSupplier() { diff --git a/processing/src/main/java/io/druid/segment/data/ZeroIndexedInts.java b/processing/src/main/java/io/druid/segment/data/ZeroIndexedInts.java index 1ee254da723..a26e215d8a2 100644 --- a/processing/src/main/java/io/druid/segment/data/ZeroIndexedInts.java +++ b/processing/src/main/java/io/druid/segment/data/ZeroIndexedInts.java @@ -19,6 +19,7 @@ package io.druid.segment.data; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntIterators; @@ -70,6 +71,11 @@ public class ZeroIndexedInts implements IndexedInts @Override public void close() throws IOException { + } + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // nothing to inspect } } diff --git a/processing/src/main/java/io/druid/segment/filter/AndFilter.java b/processing/src/main/java/io/druid/segment/filter/AndFilter.java index dbe4c7f45f0..5b993b263e6 100644 --- a/processing/src/main/java/io/druid/segment/filter/AndFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/AndFilter.java @@ -28,6 +28,7 @@ import io.druid.query.filter.BooleanFilter; import io.druid.query.filter.Filter; import io.druid.query.filter.RowOffsetMatcherFactory; import io.druid.query.filter.ValueMatcher; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelector; import io.druid.segment.ColumnSelectorFactory; @@ -39,13 +40,13 @@ import java.util.List; public class AndFilter implements BooleanFilter { private static final Joiner AND_JOINER = Joiner.on(" && "); + static final ValueMatcher[] EMPTY_VALUE_MATCHER_ARRAY = new ValueMatcher[0]; private final List filters; - public AndFilter( - List filters - ) + public AndFilter(List filters) { + Preconditions.checkArgument(filters.size() > 0, "Can't construct empty AndFilter"); this.filters = filters; } @@ -80,10 +81,6 @@ public class AndFilter implements BooleanFilter @Override public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { - if (filters.size() == 0) { - return BooleanValueMatcher.of(false); - } - final ValueMatcher[] matchers = new ValueMatcher[filters.size()]; for (int i = 0; i < filters.size(); i++) { @@ -117,19 +114,7 @@ public class AndFilter implements BooleanFilter matchers.add(0, offsetMatcher); } - return new ValueMatcher() - { - @Override - public boolean matches() - { - for (ValueMatcher valueMatcher : matchers) { - if (!valueMatcher.matches()) { - return false; - } - } - return true; - } - }; + return makeMatcher(matchers.toArray(EMPTY_VALUE_MATCHER_ARRAY)); } @Override @@ -181,6 +166,7 @@ public class AndFilter implements BooleanFilter private ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers) { + Preconditions.checkState(baseMatchers.length > 0); if (baseMatchers.length == 1) { return baseMatchers[0]; } @@ -197,6 +183,15 @@ public class AndFilter implements BooleanFilter } return true; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("firstBaseMatcher", baseMatchers[0]); + inspector.visit("secondBaseMatcher", baseMatchers[1]); + // Don't inspect the 3rd and all consequent baseMatchers, cut runtime shape combinations at this point. + // Anyway if the filter is so complex, Hotspot won't inline all calls because of the inline limit. + } }; } diff --git a/processing/src/main/java/io/druid/segment/filter/ColumnComparisonFilter.java b/processing/src/main/java/io/druid/segment/filter/ColumnComparisonFilter.java index f8e126a2424..698414435a0 100644 --- a/processing/src/main/java/io/druid/segment/filter/ColumnComparisonFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/ColumnComparisonFilter.java @@ -29,6 +29,7 @@ import io.druid.query.filter.ValueGetter; import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcherColumnSelectorStrategy; import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionHandlerUtils; @@ -74,6 +75,9 @@ public class ColumnComparisonFilter implements Filter } public static ValueMatcher makeValueMatcher(final ValueGetter[] valueGetters) { + if (valueGetters.length == 0) { + return BooleanValueMatcher.of(true); + } return new ValueMatcher() { @Override @@ -93,6 +97,13 @@ public class ColumnComparisonFilter implements Filter } return true; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // All value getters are likely the same or similar (in terms of runtime shape), so inspecting only one of them. + inspector.visit("oneValueGetter", valueGetters[0]); + } }; } diff --git a/processing/src/main/java/io/druid/segment/filter/FalseValueMatcher.java b/processing/src/main/java/io/druid/segment/filter/FalseValueMatcher.java index 189dae6a633..2467dd79dcb 100644 --- a/processing/src/main/java/io/druid/segment/filter/FalseValueMatcher.java +++ b/processing/src/main/java/io/druid/segment/filter/FalseValueMatcher.java @@ -20,6 +20,7 @@ package io.druid.segment.filter; import io.druid.query.filter.ValueMatcher; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; final class FalseValueMatcher implements ValueMatcher { @@ -39,4 +40,10 @@ final class FalseValueMatcher implements ValueMatcher { return false; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // nothing to inspect + } } diff --git a/processing/src/main/java/io/druid/segment/filter/Filters.java b/processing/src/main/java/io/druid/segment/filter/Filters.java index 756bcee52e1..a8e8a34a74a 100644 --- a/processing/src/main/java/io/druid/segment/filter/Filters.java +++ b/processing/src/main/java/io/druid/segment/filter/Filters.java @@ -38,6 +38,7 @@ import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcherColumnSelectorStrategy; import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelector; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionHandlerUtils; @@ -453,6 +454,13 @@ public class Filters { return predicate.applyLong(longSelector.get()); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("longSelector", longSelector); + inspector.visit("predicate", predicate); + } }; } diff --git a/processing/src/main/java/io/druid/segment/filter/NotFilter.java b/processing/src/main/java/io/druid/segment/filter/NotFilter.java index cbba340b5c8..35a365bc63e 100644 --- a/processing/src/main/java/io/druid/segment/filter/NotFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/NotFilter.java @@ -23,6 +23,7 @@ import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelector; import io.druid.segment.ColumnSelectorFactory; @@ -60,6 +61,12 @@ public class NotFilter implements Filter { return !baseMatcher.matches(); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("baseMatcher", baseMatcher); + } }; } diff --git a/processing/src/main/java/io/druid/segment/filter/OrFilter.java b/processing/src/main/java/io/druid/segment/filter/OrFilter.java index f882facd2d2..58a99f5331e 100644 --- a/processing/src/main/java/io/druid/segment/filter/OrFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/OrFilter.java @@ -20,6 +20,7 @@ package io.druid.segment.filter; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.query.filter.BitmapIndexSelector; @@ -27,6 +28,7 @@ import io.druid.query.filter.BooleanFilter; import io.druid.query.filter.Filter; import io.druid.query.filter.RowOffsetMatcherFactory; import io.druid.query.filter.ValueMatcher; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.ColumnSelector; import io.druid.segment.ColumnSelectorFactory; @@ -41,13 +43,9 @@ public class OrFilter implements BooleanFilter private final List filters; - public OrFilter( - List filters - ) + public OrFilter(List filters) { - if (filters.size() == 0) { - throw new IllegalArgumentException("Can't construct empty OrFilter (the universe does not exist)"); - } + Preconditions.checkArgument(filters.size() > 0, "Can't construct empty OrFilter (the universe does not exist)"); this.filters = filters; } @@ -103,23 +101,13 @@ public class OrFilter implements BooleanFilter matchers.add(0, offsetMatcher); } - return new ValueMatcher() - { - @Override - public boolean matches() - { - for (ValueMatcher valueMatcher : matchers) { - if (valueMatcher.matches()) { - return true; - } - } - return false; - } - }; + return makeMatcher(matchers.toArray(AndFilter.EMPTY_VALUE_MATCHER_ARRAY)); } private ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers){ + Preconditions.checkState(baseMatchers.length > 0); + if (baseMatchers.length == 1) { return baseMatchers[0]; } @@ -136,6 +124,15 @@ public class OrFilter implements BooleanFilter } return false; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("firstBaseMatcher", baseMatchers[0]); + inspector.visit("secondBaseMatcher", baseMatchers[1]); + // Don't inspect the 3rd and all consequent baseMatchers, cut runtime shape combinations at this point. + // Anyway if the filter is so complex, Hotspot won't inline all calls because of the inline limit. + } }; } diff --git a/processing/src/main/java/io/druid/segment/filter/TrueValueMatcher.java b/processing/src/main/java/io/druid/segment/filter/TrueValueMatcher.java index 288e4387082..bfcdb7a3213 100644 --- a/processing/src/main/java/io/druid/segment/filter/TrueValueMatcher.java +++ b/processing/src/main/java/io/druid/segment/filter/TrueValueMatcher.java @@ -20,6 +20,7 @@ package io.druid.segment.filter; import io.druid.query.filter.ValueMatcher; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; final class TrueValueMatcher implements ValueMatcher { @@ -39,4 +40,10 @@ final class TrueValueMatcher implements ValueMatcher { return true; } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // nothing to inspect + } } diff --git a/processing/src/main/java/io/druid/segment/historical/HistoricalCursor.java b/processing/src/main/java/io/druid/segment/historical/HistoricalCursor.java new file mode 100644 index 00000000000..ce289ce27d0 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/historical/HistoricalCursor.java @@ -0,0 +1,26 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.historical; + +import io.druid.segment.Cursor; + +public interface HistoricalCursor extends Cursor, OffsetHolder +{ +} diff --git a/processing/src/main/java/io/druid/segment/historical/HistoricalDimensionSelector.java b/processing/src/main/java/io/druid/segment/historical/HistoricalDimensionSelector.java new file mode 100644 index 00000000000..698ed2f4d45 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/historical/HistoricalDimensionSelector.java @@ -0,0 +1,33 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.historical; + +import io.druid.query.monomorphicprocessing.CalledFromHotLoop; +import io.druid.segment.DimensionSelector; +import io.druid.segment.data.IndexedInts; + +/** + * Specialization for {@link DimensionSelector} queryable via offsets from {@link HistoricalCursor}. + */ +public interface HistoricalDimensionSelector extends DimensionSelector +{ + @CalledFromHotLoop + IndexedInts getRow(int offset); +} diff --git a/processing/src/main/java/io/druid/segment/historical/HistoricalFloatColumnSelector.java b/processing/src/main/java/io/druid/segment/historical/HistoricalFloatColumnSelector.java new file mode 100644 index 00000000000..d2b91dab76a --- /dev/null +++ b/processing/src/main/java/io/druid/segment/historical/HistoricalFloatColumnSelector.java @@ -0,0 +1,29 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.historical; + +import io.druid.query.monomorphicprocessing.CalledFromHotLoop; +import io.druid.segment.FloatColumnSelector; + +public interface HistoricalFloatColumnSelector extends FloatColumnSelector +{ + @CalledFromHotLoop + float get(int offset); +} diff --git a/processing/src/main/java/io/druid/segment/historical/OffsetHolder.java b/processing/src/main/java/io/druid/segment/historical/OffsetHolder.java new file mode 100644 index 00000000000..438617d7207 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/historical/OffsetHolder.java @@ -0,0 +1,27 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.historical; + +import io.druid.segment.data.Offset; + +public interface OffsetHolder +{ + Offset getOffset(); +} diff --git a/processing/src/main/java/io/druid/segment/historical/SingleValueHistoricalDimensionSelector.java b/processing/src/main/java/io/druid/segment/historical/SingleValueHistoricalDimensionSelector.java new file mode 100644 index 00000000000..f5f6ba29d65 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/historical/SingleValueHistoricalDimensionSelector.java @@ -0,0 +1,30 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.historical; + +import io.druid.query.monomorphicprocessing.CalledFromHotLoop; +import io.druid.segment.SingleValueDimensionSelector; + +public interface SingleValueHistoricalDimensionSelector + extends HistoricalDimensionSelector, SingleValueDimensionSelector +{ + @CalledFromHotLoop + int getRowValue(int offset); +} diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index a00ba970355..0b3f3cd1df1 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -25,6 +25,7 @@ import com.google.common.collect.Maps; import io.druid.collections.bitmap.BitmapFactory; import io.druid.collections.bitmap.MutableBitmap; import io.druid.java.util.common.logger.Logger; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.DimensionHandler; import io.druid.segment.DimensionIndexer; import io.druid.segment.IndexableAdapter; @@ -301,6 +302,12 @@ public class IncrementalIndexAdapter implements IndexableAdapter public void close() throws IOException { } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("bitmapIndex", bitmapIndex); + } } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 2768d104759..32d7064fa40 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -490,6 +490,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // nothing to inspect } } return new TimeLongColumnSelector(); diff --git a/processing/src/main/java/io/druid/segment/virtual/BaseSingleValueDimensionSelector.java b/processing/src/main/java/io/druid/segment/virtual/BaseSingleValueDimensionSelector.java index 1207c275c9f..d7425cd08c0 100644 --- a/processing/src/main/java/io/druid/segment/virtual/BaseSingleValueDimensionSelector.java +++ b/processing/src/main/java/io/druid/segment/virtual/BaseSingleValueDimensionSelector.java @@ -22,15 +22,17 @@ package io.druid.segment.virtual; import com.google.common.base.Predicate; import io.druid.query.filter.ValueMatcher; import io.druid.query.monomorphicprocessing.CalledFromHotLoop; +import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; import io.druid.segment.DimensionSelector; import io.druid.segment.IdLookup; +import io.druid.segment.SingleValueDimensionSelector; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.ZeroIndexedInts; import javax.annotation.Nullable; import java.util.Objects; -public abstract class BaseSingleValueDimensionSelector implements DimensionSelector +public abstract class BaseSingleValueDimensionSelector implements SingleValueDimensionSelector { @CalledFromHotLoop protected abstract String getValue(); @@ -41,6 +43,12 @@ public abstract class BaseSingleValueDimensionSelector implements DimensionSelec return ZeroIndexedInts.instance(); } + @Override + public int getRowValue() + { + return 0; + } + @Override public int getValueCardinality() { @@ -63,6 +71,12 @@ public abstract class BaseSingleValueDimensionSelector implements DimensionSelec { return Objects.equals(getValue(), value); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", BaseSingleValueDimensionSelector.this); + } }; } @@ -76,6 +90,13 @@ public abstract class BaseSingleValueDimensionSelector implements DimensionSelec { return predicate.apply(getValue()); } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", BaseSingleValueDimensionSelector.this); + inspector.visit("predicate", predicate); + } }; } diff --git a/processing/src/main/java/io/druid/segment/virtual/ExpressionSelectors.java b/processing/src/main/java/io/druid/segment/virtual/ExpressionSelectors.java index 9db9f9ddab4..757f7deddef 100644 --- a/processing/src/main/java/io/druid/segment/virtual/ExpressionSelectors.java +++ b/processing/src/main/java/io/druid/segment/virtual/ExpressionSelectors.java @@ -130,6 +130,7 @@ public class ExpressionSelectors public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("baseSelector", baseSelector); + inspector.visit("extractionFn", extractionFn); } } return new ExtractionExpressionDimensionSelector(); diff --git a/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java index b2db0636c67..7671700040c 100644 --- a/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/FilteredAggregatorTest.java @@ -172,6 +172,7 @@ public class FilteredAggregatorTest @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // Don't care about runtime shape in tests } } ); diff --git a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java index a19a7396fcd..0f53090c482 100644 --- a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java @@ -157,7 +157,12 @@ public class CardinalityAggregatorTest @Override public void close() throws IOException { + } + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + // Don't care about runtime shape in tests } }; } @@ -210,6 +215,7 @@ public class CardinalityAggregatorTest @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // Don't care about runtime shape in tests } } diff --git a/processing/src/test/java/io/druid/query/dimension/TestDimensionSelector.java b/processing/src/test/java/io/druid/query/dimension/TestDimensionSelector.java index e31b51a7ac1..a105b6e2378 100644 --- a/processing/src/test/java/io/druid/query/dimension/TestDimensionSelector.java +++ b/processing/src/test/java/io/druid/query/dimension/TestDimensionSelector.java @@ -97,5 +97,6 @@ class TestDimensionSelector implements DimensionSelector @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // Don't care about runtime shape in tests } } diff --git a/processing/src/test/java/io/druid/query/monomorphicprocessing/StringRuntimeShapeTest.java b/processing/src/test/java/io/druid/query/monomorphicprocessing/StringRuntimeShapeTest.java index 189230710a0..a7a7566feb6 100644 --- a/processing/src/test/java/io/druid/query/monomorphicprocessing/StringRuntimeShapeTest.java +++ b/processing/src/test/java/io/druid/query/monomorphicprocessing/StringRuntimeShapeTest.java @@ -34,6 +34,7 @@ public class StringRuntimeShapeTest @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // Don't care about runtime shape in tests } } diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index 67dcbe3780e..5bcc27c7ac1 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -28,9 +28,12 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Longs; import io.druid.collections.StupidPool; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Pair; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; @@ -76,7 +79,6 @@ import io.druid.query.filter.SelectorDimFilter; import io.druid.query.lookup.LookupExtractionFn; import io.druid.query.ordering.StringComparators; import io.druid.query.spec.MultipleIntervalSegmentSpec; -import io.druid.query.timeseries.TimeseriesQuery; import io.druid.segment.TestHelper; import io.druid.segment.column.Column; import io.druid.segment.column.ValueType; @@ -100,6 +102,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** */ @@ -111,13 +114,15 @@ public class TopNQueryRunnerTest { List>> retVal = queryRunners(); List parameters = new ArrayList<>(); - for (int i = 0; i < 8; i++) { + for (int i = 0; i < 32; i++) { for (QueryRunner> firstParameter : retVal) { - Object[] params = new Object[4]; + Object[] params = new Object[6]; params[0] = firstParameter; params[1] = (i & 1) != 0; params[2] = (i & 2) != 0; params[3] = (i & 4) != 0; + params[4] = (i & 8) != 0; + params[5] = (i & 16) != 0; parameters.add(params); } } @@ -174,12 +179,20 @@ public class TopNQueryRunnerTest QueryRunner> runner, boolean specializeGeneric1AggPooledTopN, boolean specializeGeneric2AggPooledTopN, + boolean specializeHistorical1SimpleDoubleAggPooledTopN, + boolean specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN, boolean duplicateSingleAggregatorQueries ) { this.runner = runner; - PooledTopNAlgorithm.specializeGeneric1AggPooledTopN = specializeGeneric1AggPooledTopN; - PooledTopNAlgorithm.specializeGeneric2AggPooledTopN = specializeGeneric2AggPooledTopN; + PooledTopNAlgorithm.setSpecializeGeneric1AggPooledTopN(specializeGeneric1AggPooledTopN); + PooledTopNAlgorithm.setSpecializeGeneric2AggPooledTopN(specializeGeneric2AggPooledTopN); + PooledTopNAlgorithm.setSpecializeHistorical1SimpleDoubleAggPooledTopN( + specializeHistorical1SimpleDoubleAggPooledTopN + ); + PooledTopNAlgorithm.setSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN( + specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN + ); this.duplicateSingleAggregatorQueries = duplicateSingleAggregatorQueries; } @@ -1886,19 +1899,7 @@ public class TopNQueryRunnerTest .build(); Granularity gran = Granularities.DAY; - TimeseriesQuery tsQuery = Druids.newTimeseriesQueryBuilder() - .dataSource(QueryRunnerTestHelper.dataSource) - .granularity(gran) - .intervals(QueryRunnerTestHelper.fullOnInterval) - .aggregators( - Arrays.asList( - QueryRunnerTestHelper.rowsCount, - QueryRunnerTestHelper.indexDoubleSum, - QueryRunnerTestHelper.qualityUniques - ) - ) - .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) - .build(); + List> expectedResults = Arrays.asList( new Result<>( new DateTime("2011-01-12T00:00:00.000Z"), @@ -4966,79 +4967,98 @@ public class TopNQueryRunnerTest @Test public void testFullOnTopNWithAggsOnNumericDims() { - TopNQuery query = new TopNQueryBuilder() - .dataSource(QueryRunnerTestHelper.dataSource) - .granularity(QueryRunnerTestHelper.allGran) - .dimension(QueryRunnerTestHelper.marketDimension) - .metric(QueryRunnerTestHelper.indexMetric) - .threshold(4) - .intervals(QueryRunnerTestHelper.fullOnInterval) - .aggregators( - Lists.newArrayList( - Iterables.concat( - QueryRunnerTestHelper.commonAggregators, - Lists.newArrayList( - new DoubleMaxAggregatorFactory("maxIndex", "index"), - new DoubleMinAggregatorFactory("minIndex", "index"), - new LongSumAggregatorFactory("qlLong", "qualityLong"), - new DoubleSumAggregatorFactory("qlFloat", "qualityLong"), - new DoubleSumAggregatorFactory("qfFloat", "qualityFloat"), - new LongSumAggregatorFactory("qfLong", "qualityFloat") - ) - ) - ) - ) - .postAggregators(Arrays.asList(QueryRunnerTestHelper.addRowsIndexConstant)) - .build(); - - List> expectedResults = Arrays.asList( - new Result( - new DateTime("2011-01-12T00:00:00.000Z"), - new TopNResultValue( - Arrays.>asList( - ImmutableMap.builder() - .put(QueryRunnerTestHelper.marketDimension, "total_market") - .put("rows", 186L) - .put("index", 215679.82879638672D) - .put("addRowsIndexConstant", 215866.82879638672D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_2) - .put("maxIndex", 1743.9217529296875D) - .put("minIndex", 792.3260498046875D) - .put("qlLong", 279000L) - .put("qlFloat", 279000.0) - .put("qfFloat", 2790000.0) - .put("qfLong", 2790000L) - .build(), - ImmutableMap.builder() - .put(QueryRunnerTestHelper.marketDimension, "upfront") - .put("rows", 186L) - .put("index", 192046.1060180664D) - .put("addRowsIndexConstant", 192233.1060180664D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_2) - .put("maxIndex", 1870.06103515625D) - .put("minIndex", 545.9906005859375D) - .put("qlLong", 279000L) - .put("qlFloat", 279000.0) - .put("qfFloat", 2790000.0) - .put("qfLong", 2790000L) - .build(), - ImmutableMap.builder() - .put(QueryRunnerTestHelper.marketDimension, "spot") - .put("rows", 837L) - .put("index", 95606.57232284546D) - .put("addRowsIndexConstant", 96444.57232284546D) - .put("uniques", QueryRunnerTestHelper.UNIQUES_9) - .put("maxIndex", 277.2735290527344D) - .put("minIndex", 59.02102279663086D) - .put("qlLong", 1171800L) - .put("qlFloat", 1171800.0) - .put("qfFloat", 11718000.0) - .put("qfLong", 11718000L) - .build() - ) - ) - ) + List>> aggregations = new ArrayList<>(); + aggregations.add(new Pair<>( + QueryRunnerTestHelper.rowsCount, + Longs.asList(186L, 186L, 837L) + )); + Pair> indexAggregation = new Pair<>( + QueryRunnerTestHelper.indexDoubleSum, + Doubles.asList(215679.82879638672D, 192046.1060180664D, 95606.57232284546D) ); - assertExpectedResults(expectedResults, query); + aggregations.add(indexAggregation); + aggregations.add(new Pair<>( + QueryRunnerTestHelper.qualityUniques, + Doubles.asList(QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.UNIQUES_9) + )); + aggregations.add(new Pair<>( + new DoubleMaxAggregatorFactory("maxIndex", "index"), + Doubles.asList(1743.9217529296875D, 1870.06103515625D, 277.2735290527344D) + )); + aggregations.add(new Pair<>( + new DoubleMinAggregatorFactory("minIndex", "index"), + Doubles.asList(792.3260498046875D, 545.9906005859375D, 59.02102279663086D) + )); + aggregations.add(new Pair<>( + new LongSumAggregatorFactory("qlLong", "qualityLong"), + Longs.asList(279000L, 279000L, 1171800L) + )); + aggregations.add(new Pair<>( + new DoubleSumAggregatorFactory("qlFloat", "qualityLong"), + Doubles.asList(279000.0, 279000.0, 1171800.0) + )); + aggregations.add(new Pair<>( + new DoubleSumAggregatorFactory("qfFloat", "qualityFloat"), + Doubles.asList(2790000.0, 2790000.0, 11718000.0) + )); + aggregations.add(new Pair<>( + new LongSumAggregatorFactory("qfLong", "qualityFloat"), + Longs.asList(2790000L, 2790000L, 11718000L) + )); + + List>>> aggregationCombinations = new ArrayList<>(); + for (Pair> aggregation : aggregations) { + aggregationCombinations.add(Collections.singletonList(aggregation)); + } + aggregationCombinations.add(aggregations); + + for (List>> aggregationCombination : aggregationCombinations) { + boolean hasIndexAggregator = aggregationCombination.stream().anyMatch(agg -> "index".equals(agg.lhs.getName())); + boolean hasRowsAggregator = aggregationCombination.stream().anyMatch(agg -> "rows".equals(agg.lhs.getName())); + TopNQueryBuilder queryBuilder = new TopNQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .dimension(QueryRunnerTestHelper.marketDimension) + .threshold(4) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators(aggregationCombination.stream().map(agg -> agg.lhs).collect(Collectors.toList())); + String metric; + if (hasIndexAggregator) { + metric = "index"; + } else { + metric = aggregationCombination.get(0).lhs.getName(); + } + queryBuilder.metric(metric); + if (hasIndexAggregator && hasRowsAggregator) { + queryBuilder.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant)); + } + TopNQuery query = queryBuilder.build(); + + ImmutableMap.Builder row1 = ImmutableMap.builder() + .put(QueryRunnerTestHelper.marketDimension, "total_market"); + ImmutableMap.Builder row2 = ImmutableMap.builder() + .put(QueryRunnerTestHelper.marketDimension, "upfront"); + ImmutableMap.Builder row3 = ImmutableMap.builder() + .put(QueryRunnerTestHelper.marketDimension, "spot"); + if (hasIndexAggregator && hasRowsAggregator) { + row1.put("addRowsIndexConstant", 215866.82879638672D); + row2.put("addRowsIndexConstant", 192233.1060180664D); + row3.put("addRowsIndexConstant", 96444.57232284546D); + } + aggregationCombination.forEach(agg -> { + row1.put(agg.lhs.getName(), agg.rhs.get(0)); + row2.put(agg.lhs.getName(), agg.rhs.get(1)); + row3.put(agg.lhs.getName(), agg.rhs.get(2)); + }); + List> rows = Lists.newArrayList(row1.build(), row2.build(), row3.build()); + rows.sort((r1, r2) -> ((Comparable) r2.get(metric)).compareTo(r1.get(metric))); + List> expectedResults = Collections.singletonList( + new Result<>( + new DateTime("2011-01-12T00:00:00.000Z"), + new TopNResultValue(rows) + ) + ); + assertExpectedResults(expectedResults, query); + } } } diff --git a/processing/src/test/java/io/druid/segment/TestFloatColumnSelector.java b/processing/src/test/java/io/druid/segment/TestFloatColumnSelector.java index 3d53ac0effc..8d6bcf45fcb 100644 --- a/processing/src/test/java/io/druid/segment/TestFloatColumnSelector.java +++ b/processing/src/test/java/io/druid/segment/TestFloatColumnSelector.java @@ -23,11 +23,9 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; public abstract class TestFloatColumnSelector implements FloatColumnSelector { - /** - * Don't care about runtime shape in tests - */ @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // Don't care about runtime shape in tests } } diff --git a/processing/src/test/java/io/druid/segment/TestLongColumnSelector.java b/processing/src/test/java/io/druid/segment/TestLongColumnSelector.java index f4b5a32f258..6c6db1da47a 100644 --- a/processing/src/test/java/io/druid/segment/TestLongColumnSelector.java +++ b/processing/src/test/java/io/druid/segment/TestLongColumnSelector.java @@ -23,11 +23,9 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector; public abstract class TestLongColumnSelector implements LongColumnSelector { - /** - * Don't care about runtime shape in tests - */ @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // Don't care about runtime shape in tests } } diff --git a/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java b/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java index 3e8e7398f9f..ef284067997 100644 --- a/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java +++ b/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java @@ -42,7 +42,8 @@ public class IndexedIntsTest { return Arrays.asList( new Object[][]{ - {VSizeIndexedInts.fromArray(array)} + {VSizeIndexedInts.fromArray(array)}, + {ArrayBasedIndexedInts.of(array)} } ); } diff --git a/processing/src/test/java/io/druid/segment/virtual/VirtualColumnsTest.java b/processing/src/test/java/io/druid/segment/virtual/VirtualColumnsTest.java index 7b8f8c8abf3..7235ccc5b2a 100644 --- a/processing/src/test/java/io/druid/segment/virtual/VirtualColumnsTest.java +++ b/processing/src/test/java/io/druid/segment/virtual/VirtualColumnsTest.java @@ -46,15 +46,13 @@ import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.column.ValueType; import io.druid.segment.data.IndexedInts; -import it.unimi.dsi.fastutil.ints.IntIterator; -import it.unimi.dsi.fastutil.ints.IntIterators; +import io.druid.segment.data.ZeroIndexedInts; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Arrays; import java.util.List; @@ -280,38 +278,7 @@ public class VirtualColumnsTest @Override public IndexedInts getRow() { - return new IndexedInts() - { - @Override - public int size() - { - return 1; - } - - @Override - public int get(int index) - { - return 0; - } - - @Override - public IntIterator iterator() - { - return IntIterators.singleton(0); - } - - @Override - public void fill(int index, int[] toFill) - { - throw new UnsupportedOperationException("fill not supported"); - } - - @Override - public void close() throws IOException - { - - } - }; + return ZeroIndexedInts.instance(); } @Override @@ -362,6 +329,7 @@ public class VirtualColumnsTest @Override public void inspectRuntimeShape(RuntimeShapeInspector inspector) { + // Don't care about runtime shape in tests } };