Monomorphic processing of TopN queries with simple double aggregators over historical segments (part of #3798) (#4079)

* Monomorphic processing of topN queries with simple double aggregators and historical segments

* Add CalledFromHotLoop annocations to specialized methods in SimpleDoubleBufferAggregator

* Fix a bug in Historical1SimpleDoubleAggPooledTopNScannerPrototype

* Fix a bug in SpecializationService

* In SpecializationService, emit maxSpecializations warning only once

* Make GenericIndexed.theBuffer final

* Address comments

* Newline

* Reapply 439c906 (Make GenericIndexed.theBuffer final)

* Remove extra PooledTopNAlgorithm.capabilities field

* Improve CachingIndexed.inspectRuntimeShape()

* Fix CompressedVSizeIntsIndexedSupplier.inspectRuntimeShape()

* Don't override inspectRuntimeShape() in subclasses of CompressedVSizeIndexedInts

* Annotate methods in specializations of DimensionSelector and FloatColumnSelector with @CalledFromHotLoop

* Make ValueMatcher to implement HotLoopCallee

* Doc fix

* Fix inspectRuntimeShape() impl in ExpressionSelectors

* INFO logging of specialization events

* Remove modificator

* Fix OrFilter

* Fix AndFilter

* Refactor PooledTopNAlgorithm.scanAndAggregate()

* Small refactoring

* Add 'nothing to inspect' messages in empty HotLoopCallee.inspectRuntimeShape() implementations

* Don't care about runtime shape in tests

* Fix accessor bugs in Historical1SimpleDoubleAggPooledTopNScannerPrototype and HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype, cover them with tests

* Doc wording

* Address comments

* Remove MagicAccessorBridge and ensure Offset subclasses are public

* Attach error message to element
This commit is contained in:
Roman Leventov 2017-05-16 18:19:55 -05:00 committed by Charles Allen
parent b7a52286e8
commit d400f23791
93 changed files with 1880 additions and 739 deletions

View File

@ -0,0 +1,35 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.annotations;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Documented
@Inherited
@Retention(RetentionPolicy.SOURCE)
@Target(ElementType.TYPE)
public @interface SubclassesMustBePublic
{
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.annotations;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.RoundEnvironment;
import javax.annotation.processing.SupportedAnnotationTypes;
import javax.lang.model.element.Element;
import javax.lang.model.element.Modifier;
import javax.lang.model.element.TypeElement;
import javax.tools.Diagnostic;
import java.util.Set;
@SupportedAnnotationTypes("io.druid.annotations.SubclassesMustBePublic")
public class SubclassesMustBePublicAnnotationProcessor extends AbstractProcessor
{
@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv)
{
for (TypeElement annotation : annotations) {
Set<? extends Element> elementsAnnotatedWith = roundEnv.getElementsAnnotatedWith(annotation);
for (Element element : elementsAnnotatedWith) {
if (!element.getModifiers().contains(Modifier.PUBLIC)) {
processingEnv.getMessager().printMessage(
Diagnostic.Kind.ERROR,
element.getSimpleName() + " must be public",
element
);
}
}
}
return false;
}
}

View File

@ -0,0 +1 @@
io.druid.annotations.SubclassesMustBePublicAnnotationProcessor

View File

@ -77,5 +77,6 @@ public final class EmptyDistinctCountBufferAggregator implements BufferAggregato
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -73,5 +73,6 @@ public final class EmptySketchBufferAggregator implements BufferAggregator
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -132,6 +132,22 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</path>
</annotationProcessorPaths>
<annotationProcessors>
<annotationProcessor>io.druid.annotations.SubclassesMustBePublicAnnotationProcessor</annotationProcessor>
</annotationProcessors>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>

View File

@ -126,6 +126,7 @@ public interface BufferAggregator extends HotLoopCallee
@Override
default void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
/**

View File

@ -68,5 +68,6 @@ public class CountBufferAggregator implements BufferAggregator
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
/**
*/
public class DoubleMaxBufferAggregator extends DoubleBufferAggregator
public class DoubleMaxBufferAggregator extends SimpleDoubleBufferAggregator
{
DoubleMaxBufferAggregator(FloatColumnSelector selector)
@ -40,8 +40,18 @@ public class DoubleMaxBufferAggregator extends DoubleBufferAggregator
}
@Override
public void aggregate(ByteBuffer buf, int position)
public void putFirst(ByteBuffer buf, int position, double value)
{
buf.putDouble(position, Math.max(buf.getDouble(position), (double) selector.get()));
if (!Double.isNaN(value)) {
buf.putDouble(position, value);
} else {
init(buf, position);
}
}
@Override
public void aggregate(ByteBuffer buf, int position, double value)
{
buf.putDouble(position, Math.max(buf.getDouble(position), value));
}
}

View File

@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
/**
*/
public class DoubleMinBufferAggregator extends DoubleBufferAggregator
public class DoubleMinBufferAggregator extends SimpleDoubleBufferAggregator
{
DoubleMinBufferAggregator(FloatColumnSelector selector)
@ -40,8 +40,18 @@ public class DoubleMinBufferAggregator extends DoubleBufferAggregator
}
@Override
public void aggregate(ByteBuffer buf, int position)
public void putFirst(ByteBuffer buf, int position, double value)
{
buf.putDouble(position, Math.min(buf.getDouble(position), (double) selector.get()));
if (!Double.isNaN(value)) {
buf.putDouble(position, value);
} else {
init(buf, position);
}
}
@Override
public void aggregate(ByteBuffer buf, int position, double value)
{
buf.putDouble(position, Math.min(buf.getDouble(position), value));
}
}

View File

@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
/**
*/
public class DoubleSumBufferAggregator extends DoubleBufferAggregator
public class DoubleSumBufferAggregator extends SimpleDoubleBufferAggregator
{
DoubleSumBufferAggregator(FloatColumnSelector selector)
@ -40,8 +40,14 @@ public class DoubleSumBufferAggregator extends DoubleBufferAggregator
}
@Override
public void aggregate(ByteBuffer buf, int position)
public void putFirst(ByteBuffer buf, int position, double value)
{
buf.putDouble(position, buf.getDouble(position) + (double) selector.get());
buf.putDouble(position, value);
}
@Override
public void aggregate(ByteBuffer buf, int position, double value)
{
buf.putDouble(position, buf.getDouble(position) + value);
}
}

View File

@ -73,5 +73,6 @@ public final class NoopBufferAggregator implements BufferAggregator
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -19,34 +19,57 @@
package io.druid.query.aggregation;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
public abstract class DoubleBufferAggregator implements BufferAggregator
public abstract class SimpleDoubleBufferAggregator implements BufferAggregator
{
protected final FloatColumnSelector selector;
DoubleBufferAggregator(FloatColumnSelector selector)
SimpleDoubleBufferAggregator(FloatColumnSelector selector)
{
this.selector = selector;
}
public FloatColumnSelector getSelector()
{
return selector;
}
/**
* Faster equivalent to
* aggregator.init(buf, position);
* aggregator.aggregate(buf, position, value);
*/
@CalledFromHotLoop
public abstract void putFirst(ByteBuffer buf, int position, double value);
@CalledFromHotLoop
public abstract void aggregate(ByteBuffer buf, int position, double value);
@Override
public Object get(ByteBuffer buf, int position)
public final void aggregate(ByteBuffer buf, int position)
{
aggregate(buf, position, (double) selector.get());
}
@Override
public final Object get(ByteBuffer buf, int position)
{
return buf.getDouble(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
public final float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
public final long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}

View File

@ -31,7 +31,7 @@ import io.druid.segment.IdLookup;
import io.druid.segment.data.ArrayBasedIndexedInts;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.BooleanValueMatcher;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import javax.annotation.Nullable;
import java.util.BitSet;
@ -40,14 +40,18 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id
{
private final DimensionSelector selector;
private final IdLookup baseIdLookup;
private final Int2IntMap forwardMapping;
private final Int2IntOpenHashMap forwardMapping;
private final int[] reverseMapping;
/**
* @param selector must return true from {@link DimensionSelector#nameLookupPossibleInAdvance()}
* @param forwardMapping must have {@link Int2IntMap#defaultReturnValue(int)} configured to -1.
* @param forwardMapping must have {@link Int2IntOpenHashMap#defaultReturnValue(int)} configured to -1.
*/
ForwardingFilteredDimensionSelector(DimensionSelector selector, Int2IntMap forwardMapping, int[] reverseMapping)
ForwardingFilteredDimensionSelector(
DimensionSelector selector,
Int2IntOpenHashMap forwardMapping,
int[] reverseMapping
)
{
this.selector = Preconditions.checkNotNull(selector);
if (!selector.nameLookupPossibleInAdvance()) {
@ -106,6 +110,12 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id
// null should match empty rows in multi-value columns
return nullRow && value == null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
} else {
return BooleanValueMatcher.of(false);
@ -141,6 +151,12 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id
// null should match empty rows in multi-value columns
return nullRow && matchNull;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
@ -179,6 +195,5 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("forwardMapping", forwardMapping);
}
}

View File

@ -28,7 +28,6 @@ import io.druid.java.util.common.StringUtils;
import io.druid.query.filter.DimFilterUtils;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IdLookup;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import javax.annotation.Nullable;
@ -93,7 +92,7 @@ public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
}
final int maxPossibleFilteredCardinality = values.size();
int count = 0;
final Int2IntMap forwardMapping = new Int2IntOpenHashMap(maxPossibleFilteredCardinality);
final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap(maxPossibleFilteredCardinality);
forwardMapping.defaultReturnValue(-1);
final int[] reverseMapping = new int[maxPossibleFilteredCardinality];
IdLookup idLookup = selector.idLookup();
@ -134,7 +133,7 @@ public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
}
final int maxPossibleFilteredCardinality = selectorCardinality;
int count = 0;
final Int2IntMap forwardMapping = new Int2IntOpenHashMap(maxPossibleFilteredCardinality);
final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap(maxPossibleFilteredCardinality);
forwardMapping.defaultReturnValue(-1);
final int[] reverseMapping = new int[maxPossibleFilteredCardinality];
for (int i = 0; i < selectorCardinality; i++) {

View File

@ -79,6 +79,13 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector
// null should match empty rows in multi-value columns
return nullRow && value == null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// PredicateFilteredDimensionSelector.this inspects selector and predicate as well.
inspector.visit("selector", PredicateFilteredDimensionSelector.this);
}
};
}
@ -106,6 +113,14 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector
// null should match empty rows in multi-value columns
return nullRow && matchNull;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// PredicateFilteredDimensionSelector.this inspects selector and predicate as well.
inspector.visit("selector", PredicateFilteredDimensionSelector.this);
inspector.visit("matcherPredicate", matcherPredicate);
}
};
}

View File

@ -83,7 +83,7 @@ public class RegexFilteredDimensionSpec extends BaseFilteredDimensionSpec
}
int count = 0;
final Int2IntMap forwardMapping = new Int2IntOpenHashMap();
final Int2IntOpenHashMap forwardMapping = new Int2IntOpenHashMap();
forwardMapping.defaultReturnValue(-1);
for (int i = 0; i < selectorCardinality; i++) {
if (compiledRegex.matcher(Strings.nullToEmpty(selector.lookupName(i))).matches()) {

View File

@ -19,6 +19,7 @@
package io.druid.query.filter;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.filter.BooleanValueMatcher;
@ -41,6 +42,12 @@ public class FloatValueMatcherColumnSelectorStrategy implements ValueMatcherColu
{
return Float.floatToIntBits(selector.get()) == matchValIntBits;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
@ -57,6 +64,13 @@ public class FloatValueMatcherColumnSelectorStrategy implements ValueMatcherColu
{
return predicate.applyFloat(selector.get());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("predicate", predicate);
}
};
}

View File

@ -19,6 +19,7 @@
package io.druid.query.filter;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.filter.BooleanValueMatcher;
@ -40,6 +41,12 @@ public class LongValueMatcherColumnSelectorStrategy implements ValueMatcherColum
{
return selector.get() == matchValLong;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
@ -56,6 +63,13 @@ public class LongValueMatcherColumnSelectorStrategy implements ValueMatcherColum
{
return predicate.applyLong(selector.get());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("predicate", predicate);
}
};
}

View File

@ -19,9 +19,13 @@
package io.druid.query.filter;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
/**
*/
public interface ValueMatcher
public interface ValueMatcher extends HotLoopCallee
{
public boolean matches();
@CalledFromHotLoop
boolean matches();
}

View File

@ -34,6 +34,7 @@ import io.druid.segment.FloatColumnSelector;
import io.druid.segment.IdLookup;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.SingleValueDimensionSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
@ -105,7 +106,7 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
throw new UnsupportedOperationException("time dimension must provide an extraction function");
}
return new DimensionSelector()
return new SingleValueDimensionSelector()
{
@Override
public IndexedInts getRow()
@ -113,6 +114,12 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
return ZeroIndexedInts.instance();
}
@Override
public int getRowValue()
{
return 0;
}
@Override
public ValueMatcher makeValueMatcher(final String value)
{
@ -124,6 +131,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
String rowValue = extractionFn.apply(row.get().getTimestampFromEpoch());
return Objects.equals(rowValue, value);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
inspector.visit("extractionFn", extractionFn);
}
};
}
@ -138,6 +152,14 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
String rowValue = extractionFn.apply(row.get().getTimestampFromEpoch());
return predicate.apply(rowValue);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
inspector.visit("extractionFn", extractionFn);
inspector.visit("predicate", predicate);
}
};
}
@ -204,6 +226,12 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
}
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
}
};
} else {
return new ValueMatcher()
@ -223,6 +251,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
}
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
inspector.visit("extractionFn", extractionFn);
}
};
}
}
@ -249,6 +284,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
}
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
inspector.visit("predicate", predicate);
}
};
} else {
return new ValueMatcher()
@ -268,6 +310,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
}
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
inspector.visit("predicate", predicate);
}
};
}
}

View File

@ -37,7 +37,7 @@ public interface HotLoopCallee
* this instance (the instance on which inspectRuntimeShape() is called) is configured.
* d. ByteBuffer or similar objects, where byte order matters
* e. boolean flags, affecting branch taking
* f. Arrays of objects, meeting conditions any of conditions a-e.
* f. Arrays of objects, meeting any of conditions a-e.
*/
void inspectRuntimeShape(RuntimeShapeInspector inspector);
}

View File

@ -100,6 +100,7 @@ public final class SpecializationService
* specialization takes some JVM memory (machine code cache, byte code, etc.)
*/
private static final int maxSpecializations = Integer.getInteger("maxSpecializations", 1000);
private static final AtomicBoolean maxSpecializationsWarningEmitted = new AtomicBoolean(false);
private static final ExecutorService classSpecializationExecutor = Execs.singleThreaded("class-specialization-%d");
@ -124,7 +125,7 @@ public final class SpecializationService
String runtimeShape
)
{
return getSpecializationState(prototypeClass, runtimeShape, ImmutableMap.<Class<?>, Class<?>>of());
return getSpecializationState(prototypeClass, runtimeShape, ImmutableMap.of());
}
/**
@ -292,6 +293,8 @@ public final class SpecializationService
@Override
public T getSpecialized()
{
// Returns null because the class is not yet specialized. The purpose of WindowedLoopIterationCounter is to decide
// whether specialization should be done, or not.
return null;
}
@ -329,7 +332,7 @@ public final class SpecializationService
}
}
if (!currentMinutePresent) {
perMinuteIterations.computeIfAbsent(currentMinute, AtomicLong::new).addAndGet(newIterations);
perMinuteIterations.computeIfAbsent(currentMinute, m -> new AtomicLong()).addAndGet(newIterations);
totalIterations += newIterations;
}
return totalIterations;
@ -346,18 +349,33 @@ public final class SpecializationService
// PerPrototypeClassState.specializationStates. But it might be that nobody ever hits even the current
// maxSpecializations limit, so implementing cache eviction is an unnecessary complexity.
specialized = perPrototypeClassState.prototypeClass.newInstance();
LOG.warn(
"SpecializationService couldn't make more than [%d] specializations. "
+ "Not doing specialization for runtime shape[%s] and class remapping[%s], using the prototype class[%s]",
maxSpecializations,
specializationId.runtimeShape,
specializationId.classRemapping,
perPrototypeClassState.prototypeClass
);
if (!maxSpecializationsWarningEmitted.get() && maxSpecializationsWarningEmitted.compareAndSet(false, true)) {
LOG.warn(
"SpecializationService couldn't make more than [%d] specializations. " +
"Not doing specialization for runtime shape[%s] and class remapping[%s], using the prototype class[%s]",
maxSpecializations,
specializationId.runtimeShape,
specializationId.classRemapping,
perPrototypeClassState.prototypeClass
);
}
} else if (fakeSpecialize) {
specialized = perPrototypeClassState.prototypeClass.newInstance();
LOG.info(
"Not specializing prototype class[%s] for runtime shape[%s] and class remapping[%s] because "
+ "fakeSpecialize=true, using the prototype class instead",
perPrototypeClassState.prototypeClass,
specializationId.runtimeShape,
specializationId.classRemapping
);
} else {
specialized = perPrototypeClassState.specialize(specializationId.classRemapping);
LOG.info(
"Specializing prototype class[%s] for runtime shape[%s] and class remapping[%s]",
perPrototypeClassState.prototypeClass,
specializationId.runtimeShape,
specializationId.classRemapping
);
}
perPrototypeClassState.specializationStates.put(specializationId, new Specialized<>(specialized));
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.HistoricalDimensionSelector;
import java.nio.ByteBuffer;
public interface Historical1AggPooledTopNScanner<
DimensionSelectorType extends HistoricalDimensionSelector,
MetricSelectorType extends ColumnValueSelector,
BufferAggregatorType extends BufferAggregator>
{
/**
* @param aggregatorSize number of bytes required by aggregator for a single aggregation
* @param positions a cache for positions in resultsBuffer, where specific (indexed) dimension values are aggregated
* @return number of scanned rows, i. e. number of steps made with the given cursor
*/
long scanAndAggregate(
DimensionSelectorType dimensionSelector,
MetricSelectorType metricSelector,
BufferAggregatorType aggregator,
int aggregatorSize,
HistoricalCursor cursor,
int[] positions,
ByteBuffer resultsBuffer
);
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.query.aggregation.SimpleDoubleBufferAggregator;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.Offset;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.HistoricalDimensionSelector;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import java.nio.ByteBuffer;
public class Historical1SimpleDoubleAggPooledTopNScannerPrototype
implements Historical1AggPooledTopNScanner<
HistoricalDimensionSelector,
HistoricalFloatColumnSelector,
SimpleDoubleBufferAggregator
>
{
@Override
public long scanAndAggregate(
HistoricalDimensionSelector dimensionSelector,
HistoricalFloatColumnSelector metricSelector,
SimpleDoubleBufferAggregator aggregator,
int aggregatorSize,
HistoricalCursor cursor,
int[] positions,
ByteBuffer resultsBuffer
)
{
// See TopNUtils.copyOffset() for explanation
Offset offset = (Offset) TopNUtils.copyOffset(cursor);
long scannedRows = 0;
int positionToAllocate = 0;
while (offset.withinBounds() && !Thread.currentThread().isInterrupted()) {
int rowNum = offset.getOffset();
double metric = metricSelector.get(rowNum);
final IndexedInts dimValues = dimensionSelector.getRow(rowNum);
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position, metric);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
aggregator.putFirst(resultsBuffer, positionToAllocate, metric);
positionToAllocate += aggregatorSize;
}
}
scannedRows++;
offset.increment();
}
return scannedRows;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.query.aggregation.SimpleDoubleBufferAggregator;
import io.druid.segment.data.Offset;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import io.druid.segment.historical.SingleValueHistoricalDimensionSelector;
import java.nio.ByteBuffer;
public class HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype
implements Historical1AggPooledTopNScanner<
SingleValueHistoricalDimensionSelector,
HistoricalFloatColumnSelector,
SimpleDoubleBufferAggregator
>
{
@Override
public long scanAndAggregate(
SingleValueHistoricalDimensionSelector dimensionSelector,
HistoricalFloatColumnSelector metricSelector,
SimpleDoubleBufferAggregator aggregator,
int aggregatorSize,
HistoricalCursor cursor,
int[] positions,
ByteBuffer resultsBuffer
)
{
// See TopNUtils.copyOffset() for explanation
Offset offset = (Offset) TopNUtils.copyOffset(cursor);
long scannedRows = 0;
int positionToAllocate = 0;
while (offset.withinBounds() && !Thread.currentThread().isInterrupted()) {
int rowNum = offset.getOffset();
int dimIndex = dimensionSelector.getRowValue(rowNum);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position, metricSelector.get(rowNum));
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
aggregator.putFirst(resultsBuffer, positionToAllocate, metricSelector.get(rowNum));
positionToAllocate += aggregatorSize;
}
scannedRows++;
offset.increment();
}
return scannedRows;
}
}

View File

@ -21,6 +21,7 @@ package io.druid.query.topn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.java.util.common.Pair;
@ -28,6 +29,7 @@ import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.query.BaseQuery;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.SimpleDoubleBufferAggregator;
import io.druid.query.monomorphicprocessing.SpecializationService;
import io.druid.query.monomorphicprocessing.SpecializationState;
import io.druid.query.monomorphicprocessing.StringRuntimeShape;
@ -36,29 +38,156 @@ import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.Offset;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.HistoricalDimensionSelector;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import io.druid.segment.historical.SingleValueHistoricalDimensionSelector;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
*/
public class PooledTopNAlgorithm
extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
{
/** Non-final fields for testing, see TopNQueryRunnerTest */
@VisibleForTesting
static boolean specializeGeneric1AggPooledTopN =
private static boolean specializeGeneric1AggPooledTopN =
!Boolean.getBoolean("dontSpecializeGeneric1AggPooledTopN");
@VisibleForTesting
static boolean specializeGeneric2AggPooledTopN =
private static boolean specializeGeneric2AggPooledTopN =
!Boolean.getBoolean("dontSpecializeGeneric2AggPooledTopN");
private static boolean specializeHistorical1SimpleDoubleAggPooledTopN =
!Boolean.getBoolean("dontSpecializeHistorical1SimpleDoubleAggPooledTopN");
private static boolean specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN =
!Boolean.getBoolean("dontSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN");
/** See TopNQueryRunnerTest */
@VisibleForTesting
static void setSpecializeGeneric1AggPooledTopN(boolean value)
{
PooledTopNAlgorithm.specializeGeneric1AggPooledTopN = value;
computeSpecializedScanAndAggregateImplementations();
}
@VisibleForTesting
static void setSpecializeGeneric2AggPooledTopN(boolean value)
{
PooledTopNAlgorithm.specializeGeneric2AggPooledTopN = value;
computeSpecializedScanAndAggregateImplementations();
}
@VisibleForTesting
static void setSpecializeHistorical1SimpleDoubleAggPooledTopN(boolean value)
{
PooledTopNAlgorithm.specializeHistorical1SimpleDoubleAggPooledTopN = value;
computeSpecializedScanAndAggregateImplementations();
}
@VisibleForTesting
static void setSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN(boolean value)
{
PooledTopNAlgorithm.specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN = value;
computeSpecializedScanAndAggregateImplementations();
}
private static final Generic1AggPooledTopNScanner defaultGeneric1AggScanner =
new Generic1AggPooledTopNScannerPrototype();
private static final Generic2AggPooledTopNScanner defaultGeneric2AggScanner =
new Generic2AggPooledTopNScannerPrototype();
private static final Historical1AggPooledTopNScanner defaultHistorical1SimpleDoubleAggScanner =
new Historical1SimpleDoubleAggPooledTopNScannerPrototype();
private static final
Historical1AggPooledTopNScanner defaultHistoricalSingleValueDimSelector1SimpleDoubleAggScanner =
new HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype();
private interface ScanAndAggregate
{
/**
* If this implementation of ScanAndAggregate is executable with the given parameters, run it and return true.
* Otherwise return false (scanning and aggregation is not done).
*/
boolean scanAndAggregate(
final PooledTopNParams params,
final int[] positions,
final BufferAggregator[] theAggregators
);
}
private static final List<ScanAndAggregate> specializedScanAndAggregateImplementations = new ArrayList<>();
static {
computeSpecializedScanAndAggregateImplementations();
}
private static void computeSpecializedScanAndAggregateImplementations()
{
specializedScanAndAggregateImplementations.clear();
// The order of the following `if` blocks matters, "more specialized" implementations go first
if (specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN) {
specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> {
if (theAggregators.length == 1) {
BufferAggregator aggregator = theAggregators[0];
final Cursor cursor = params.getCursor();
if (cursor instanceof HistoricalCursor && aggregator instanceof SimpleDoubleBufferAggregator) {
if (params.getDimSelector() instanceof SingleValueHistoricalDimensionSelector &&
((SimpleDoubleBufferAggregator) aggregator).getSelector() instanceof HistoricalFloatColumnSelector) {
scanAndAggregateHistorical1SimpleDoubleAgg(
params,
positions,
(SimpleDoubleBufferAggregator) aggregator,
(HistoricalCursor) cursor,
defaultHistoricalSingleValueDimSelector1SimpleDoubleAggScanner
);
return true;
}
}
}
return false;
});
}
if (specializeHistorical1SimpleDoubleAggPooledTopN) {
specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> {
if (theAggregators.length == 1) {
BufferAggregator aggregator = theAggregators[0];
final Cursor cursor = params.getCursor();
if (cursor instanceof HistoricalCursor && aggregator instanceof SimpleDoubleBufferAggregator) {
if (params.getDimSelector() instanceof HistoricalDimensionSelector &&
((SimpleDoubleBufferAggregator) aggregator).getSelector() instanceof HistoricalFloatColumnSelector) {
scanAndAggregateHistorical1SimpleDoubleAgg(
params,
positions,
(SimpleDoubleBufferAggregator) aggregator,
(HistoricalCursor) cursor,
defaultHistorical1SimpleDoubleAggScanner
);
return true;
}
}
}
return false;
});
}
if (specializeGeneric1AggPooledTopN) {
specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> {
if (theAggregators.length == 1) {
scanAndAggregateGeneric1Agg(params, positions, theAggregators[0], params.getCursor());
return true;
}
return false;
});
}
if (specializeGeneric2AggPooledTopN) {
specializedScanAndAggregateImplementations.add((params, positions, theAggregators) -> {
if (theAggregators.length == 2) {
scanAndAggregateGeneric2Agg(params, positions, theAggregators, params.getCursor());
return true;
}
return false;
});
}
}
private final Capabilities capabilities;
private final TopNQuery query;
private final StupidPool<ByteBuffer> bufferPool;
private static final int AGG_UNROLL_COUNT = 8; // Must be able to fit loop below
@ -70,8 +199,6 @@ public class PooledTopNAlgorithm
)
{
super(capabilities);
this.capabilities = capabilities;
this.query = query;
this.bufferPool = bufferPool;
}
@ -193,17 +320,45 @@ public class PooledTopNAlgorithm
final int numProcessed
)
{
final Cursor cursor = params.getCursor();
if (specializeGeneric1AggPooledTopN && theAggregators.length == 1) {
scanAndAggregateGeneric1Agg(params, positions, theAggregators[0], cursor);
} else if (specializeGeneric2AggPooledTopN && theAggregators.length == 2) {
scanAndAggregateGeneric2Agg(params, positions, theAggregators, cursor);
} else {
scanAndAggregateDefault(params, positions, theAggregators);
for (ScanAndAggregate specializedScanAndAggregate : specializedScanAndAggregateImplementations) {
if (specializedScanAndAggregate.scanAndAggregate(params, positions, theAggregators)) {
BaseQuery.checkInterrupted();
return;
}
}
scanAndAggregateDefault(params, positions, theAggregators);
BaseQuery.checkInterrupted();
}
private static void scanAndAggregateHistorical1SimpleDoubleAgg(
PooledTopNParams params,
int[] positions,
SimpleDoubleBufferAggregator aggregator,
HistoricalCursor cursor,
Historical1AggPooledTopNScanner prototypeScanner
)
{
String runtimeShape = StringRuntimeShape.of(aggregator);
SpecializationState<Historical1AggPooledTopNScanner> specializationState =
SpecializationService.getSpecializationState(
prototypeScanner.getClass(),
runtimeShape,
ImmutableMap.of(Offset.class, cursor.getOffset().getClass())
);
Historical1AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(prototypeScanner);
long scannedRows = scanner.scanAndAggregate(
(HistoricalDimensionSelector) params.getDimSelector(),
aggregator.getSelector(),
aggregator,
params.getAggregatorSizes()[0],
cursor,
positions,
params.getResultsBuf()
);
specializationState.accountLoopIterations(scannedRows);
}
private static void scanAndAggregateGeneric1Agg(
PooledTopNParams params,
int[] positions,

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.segment.historical.HistoricalCursor;
final class TopNUtils
{
/**
* Returns Object, so javac couldn't remove cast in methods like
* {@link HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype#scanAndAggregate}. That cast is
* needed, because when TopNScannerPrototype is specialized, occurrences of {@link io.druid.segment.data.Offset} are
* replaced with the specific Offset subtype in the TopNScannerPrototype bytecode, via {@link
* io.druid.query.monomorphicprocessing.SpecializationService#getSpecializationState(Class, String,
* com.google.common.collect.ImmutableMap)}, providing ImmutableMap.of(Offset.class, specificOffsetSubtype) as the
* classRemapping argument.
*
* Casting to the specific Offset subtype helps Hotspot JIT (OpenJDK 8) to generate better assembly. It shouldn't be
* so, because the Offset subtype is still always the same (otherwise cast wouldn't be possible), so JIT should
* generate equivalent code. In OpenJDK 9 Hotspot could be improved and this "casting hack" is not needed anymore.
*/
static Object copyOffset(HistoricalCursor cursor)
{
return cursor.getOffset().clone();
}
private TopNUtils() {}
}

View File

@ -36,7 +36,7 @@ import java.util.HashSet;
/**
*/
public class BitmapOffset implements Offset
public class BitmapOffset extends Offset
{
private static final int INVALID_VALUE = -1;
private static final BitmapFactory ROARING_BITMAP_FACTORY = new RoaringBitmapSerdeFactory(false).getBitmapFactory();

View File

@ -24,6 +24,7 @@ import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.collections.spatial.ImmutableRTree;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
@ -99,6 +100,12 @@ public class ColumnSelectorBitmapIndexSelector implements BitmapIndexSelector
{
return IndexedIterable.create(this).iterator();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", column);
}
};
}

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.IndexedInts;
@ -216,6 +217,12 @@ public class CompressedVSizeIndexedSupplier implements WritableSupplier<IndexedM
{
return new IndexedIntsIterator(this);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("values", values);
}
};
}
@ -231,6 +238,12 @@ public class CompressedVSizeIndexedSupplier implements WritableSupplier<IndexedM
return IndexedIterable.create(this).iterator();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("offsets", offsets);
inspector.visit("values", values);
}
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import io.druid.java.util.common.IAE;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.BooleanValueMatcher;
@ -81,6 +82,12 @@ public final class DimensionSelectorUtils
return false;
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
} else {
if (matchNull) {
@ -93,6 +100,12 @@ public final class DimensionSelectorUtils
final int size = row.size();
return size == 0;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
} else {
return BooleanValueMatcher.of(false);
@ -124,6 +137,12 @@ public final class DimensionSelectorUtils
return false;
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
@ -169,6 +188,12 @@ public final class DimensionSelectorUtils
return false;
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
@ -197,6 +222,13 @@ public final class DimensionSelectorUtils
return false;
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("predicate", predicate);
}
};
}

View File

@ -0,0 +1,206 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.query.BaseQuery;
import io.druid.query.filter.BooleanFilter;
import io.druid.query.filter.Filter;
import io.druid.query.filter.RowOffsetMatcherFactory;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Offset;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.OffsetHolder;
import org.roaringbitmap.IntIterator;
public final class FilteredOffset extends Offset
{
private Offset baseOffset;
private final ValueMatcher filterMatcher;
FilteredOffset(
HistoricalCursor cursor,
boolean descending,
Filter postFilter,
ColumnSelectorBitmapIndexSelector bitmapIndexSelector
)
{
RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory(
cursor,
descending
);
if (postFilter instanceof BooleanFilter) {
filterMatcher = ((BooleanFilter) postFilter).makeMatcher(
bitmapIndexSelector,
cursor,
rowOffsetMatcherFactory
);
} else {
if (postFilter.supportsBitmapIndex(bitmapIndexSelector)) {
filterMatcher = rowOffsetMatcherFactory.makeRowOffsetMatcher(
postFilter.getBitmapIndex(bitmapIndexSelector)
);
} else {
filterMatcher = postFilter.makeMatcher(cursor);
}
}
}
void reset(Offset baseOffset)
{
this.baseOffset = baseOffset;
if (baseOffset.withinBounds()) {
if (!filterMatcher.matches()) {
BaseQuery.checkInterrupted();
incrementInterruptibly();
}
}
}
@Override
public void increment()
{
baseOffset.increment();
while (baseOffset.withinBounds() && !Thread.currentThread().isInterrupted()) {
if (filterMatcher.matches()) {
return;
} else {
baseOffset.increment();
}
}
}
void incrementInterruptibly()
{
baseOffset.increment();
while (baseOffset.withinBounds()) {
BaseQuery.checkInterrupted();
if (filterMatcher.matches()) {
return;
} else {
baseOffset.increment();
}
}
}
@Override
public boolean withinBounds()
{
return baseOffset.withinBounds();
}
@Override
public Offset clone()
{
FilteredOffset offset = (FilteredOffset) super.clone();
offset.baseOffset = offset.baseOffset.clone();
return offset;
}
@Override
public int getOffset()
{
return baseOffset.getOffset();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseOffset", baseOffset);
inspector.visit("filterMatcher", filterMatcher);
}
private static class CursorOffsetHolderRowOffsetMatcherFactory implements RowOffsetMatcherFactory
{
private final OffsetHolder holder;
private final boolean descending;
CursorOffsetHolderRowOffsetMatcherFactory(OffsetHolder holder, boolean descending)
{
this.holder = holder;
this.descending = descending;
}
// Use an iterator-based implementation, ImmutableBitmap.get(index) works differently for Concise and Roaring.
// ImmutableConciseSet.get(index) is also inefficient, it performs a linear scan on each call
@Override
public ValueMatcher makeRowOffsetMatcher(final ImmutableBitmap rowBitmap)
{
final IntIterator iter = descending ?
BitmapOffset.getReverseBitmapOffsetIterator(rowBitmap) :
rowBitmap.iterator();
if (!iter.hasNext()) {
return BooleanValueMatcher.of(false);
}
if (descending) {
return new ValueMatcher()
{
int iterOffset = Integer.MAX_VALUE;
@Override
public boolean matches()
{
int currentOffset = holder.getOffset().getOffset();
while (iterOffset > currentOffset && iter.hasNext()) {
iterOffset = iter.next();
}
return iterOffset == currentOffset;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("holder", holder);
inspector.visit("iter", iter);
}
};
} else {
return new ValueMatcher()
{
int iterOffset = -1;
@Override
public boolean matches()
{
int currentOffset = holder.getOffset().getOffset();
while (iterOffset < currentOffset && iter.hasNext()) {
iterOffset = iter.next();
}
return iterOffset == currentOffset;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("holder", holder);
inspector.visit("iter", iter);
}
};
}
}
}
}

View File

@ -120,6 +120,7 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}
@ -150,6 +151,7 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -22,7 +22,6 @@ package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@ -44,13 +43,13 @@ import io.druid.common.guava.FileOutputSupplier;
import io.druid.common.guava.GuavaUtils;
import io.druid.common.utils.JodaUtils;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.common.guava.MergeIterable;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.io.smoosh.Smoosh;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.AggregatorFactory;
@ -63,8 +62,6 @@ import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.data.LongSupplierSerializer;
import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.incremental.IncrementalIndex;
@ -86,7 +83,6 @@ import java.nio.IntBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@ -102,10 +98,8 @@ public class IndexMerger
{
private static final Logger log = new Logger(IndexMerger.class);
protected static final ListIndexed EMPTY_STR_DIM_VAL = new ListIndexed<>(Collections.singletonList(""), String.class);
protected static final SerializerUtils serializerUtils = new SerializerUtils();
protected static final int INVALID_ROW = -1;
protected static final Splitter SPLITTER = Splitter.on(",");
protected final ObjectMapper mapper;
protected final IndexIO indexIO;
@ -1178,46 +1172,6 @@ public class IndexMerger
}
}
public static class AggFactoryStringIndexed implements Indexed<String>
{
private final AggregatorFactory[] metricAggs;
public AggFactoryStringIndexed(AggregatorFactory[] metricAggs)
{
this.metricAggs = metricAggs;
}
@Override
public Class<? extends String> getClazz()
{
return String.class;
}
@Override
public int size()
{
return metricAggs.length;
}
@Override
public String get(int index)
{
return metricAggs[index].getName();
}
@Override
public int indexOf(String value)
{
throw new UnsupportedOperationException();
}
@Override
public Iterator<String> iterator()
{
return IndexedIterable.create(this).iterator();
}
}
public static class RowboatMergeFunction implements BinaryFn<Rowboat, Rowboat, Rowboat>
{
private final AggregatorFactory[] metricAggs;

View File

@ -119,6 +119,7 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}
@ -150,6 +151,7 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -26,10 +26,11 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ZeroIndexedInts;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.historical.SingleValueHistoricalDimensionSelector;
import javax.annotation.Nullable;
public class NullDimensionSelector implements DimensionSelector, IdLookup
public class NullDimensionSelector implements SingleValueHistoricalDimensionSelector, IdLookup
{
private static final NullDimensionSelector INSTANCE = new NullDimensionSelector();
@ -49,6 +50,24 @@ public class NullDimensionSelector implements DimensionSelector, IdLookup
return ZeroIndexedInts.instance();
}
@Override
public int getRowValue()
{
return 0;
}
@Override
public int getRowValue(int offset)
{
return 0;
}
@Override
public IndexedInts getRow(int offset)
{
return getRow();
}
@Override
public ValueMatcher makeValueMatcher(String value)
{
@ -96,5 +115,6 @@ public class NullDimensionSelector implements DimensionSelector, IdLookup
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -28,6 +28,7 @@ import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
@ -165,6 +166,12 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
return IndexedIterable.create(this).iterator();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("dict", dict);
}
};
}

View File

@ -20,24 +20,18 @@
package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.io.Closer;
import io.druid.query.BaseQuery;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.BooleanFilter;
import io.druid.query.filter.Filter;
import io.druid.query.filter.RowOffsetMatcherFactory;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
@ -50,19 +44,16 @@ import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.Offset;
import io.druid.segment.filter.AndFilter;
import io.druid.segment.filter.BooleanValueMatcher;
import it.unimi.dsi.fastutil.ints.IntIterators;
import io.druid.segment.historical.HistoricalCursor;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.roaringbitmap.IntIterator;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*/
@ -305,7 +296,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
postFilter,
selector
).build(),
Predicates.<Cursor>notNull()
Objects::nonNull
);
}
@ -320,7 +311,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
private static class CursorSequenceBuilder
{
private final StorageAdapter storageAdapter;
private final QueryableIndex index;
private final Interval interval;
private final VirtualColumns virtualColumns;
@ -345,7 +335,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
ColumnSelectorBitmapIndexSelector bitmapIndexSelector
)
{
this.storageAdapter = storageAdapter;
this.index = storageAdapter.index;
this.interval = interval;
this.virtualColumns = virtualColumns;
@ -418,11 +407,44 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final Offset initOffset = offset.clone();
final DateTime myBucket = gran.toDateTime(inputInterval.getStartMillis());
final CursorOffsetHolder cursorOffsetHolder = new CursorOffsetHolder();
abstract class QueryableIndexBaseCursor implements Cursor
abstract class QueryableIndexBaseCursor<OffsetType extends Offset> implements HistoricalCursor
{
Offset cursorOffset;
OffsetType cursorOffset;
@Override
public OffsetType getOffset()
{
return cursorOffset;
}
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public DimensionSelector makeDimensionSelector(
@ -472,204 +494,13 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
final DictionaryEncodedColumn<String> column = cachedColumn;
abstract class QueryableDimensionSelector implements DimensionSelector, IdLookup
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", column);
inspector.visit("cursorOffset", cursorOffset);
inspector.visit("extractionFn", extractionFn);
}
}
if (column == null) {
return NullDimensionSelector.instance();
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
class MultiValueDimensionSelector extends QueryableDimensionSelector
{
@Override
public IndexedInts getRow()
{
return column.getMultiValueRow(cursorOffset.getOffset());
}
@Override
public ValueMatcher makeValueMatcher(String value)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, value);
}
@Override
public ValueMatcher makeValueMatcher(Predicate<String> predicate)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate);
}
@Override
public int getValueCardinality()
{
return column.getCardinality();
}
@Override
public String lookupName(int id)
{
final String value = column.lookupName(id);
return extractionFn == null ?
value :
extractionFn.apply(value);
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return true;
}
@Nullable
@Override
public IdLookup idLookup()
{
return extractionFn == null ? this : null;
}
@Override
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return column.lookupId(name);
}
}
return new MultiValueDimensionSelector();
} else {
class SingleValueDimensionSelector extends QueryableDimensionSelector
{
@Override
public IndexedInts getRow()
{
// using an anonymous class is faster than creating a class that stores a copy of the value
return new IndexedInts()
{
@Override
public int size()
{
return 1;
}
@Override
public int get(int index)
{
return column.getSingleValueRow(cursorOffset.getOffset());
}
@Override
public it.unimi.dsi.fastutil.ints.IntIterator iterator()
{
return IntIterators.singleton(column.getSingleValueRow(cursorOffset.getOffset()));
}
@Override
public void fill(int index, int[] toFill)
{
throw new UnsupportedOperationException("fill not supported");
}
@Override
public void close() throws IOException
{
}
};
}
@Override
public ValueMatcher makeValueMatcher(final String value)
{
if (extractionFn == null) {
final int valueId = lookupId(value);
if (valueId >= 0) {
return new ValueMatcher()
{
@Override
public boolean matches()
{
return column.getSingleValueRow(cursorOffset.getOffset()) == valueId;
}
};
} else {
return BooleanValueMatcher.of(false);
}
} else {
// Employ precomputed BitSet optimization
return makeValueMatcher(Predicates.equalTo(value));
}
}
@Override
public ValueMatcher makeValueMatcher(final Predicate<String> predicate)
{
final BitSet predicateMatchingValueIds = DimensionSelectorUtils.makePredicateMatchingSet(
this,
predicate
);
return new ValueMatcher()
{
@Override
public boolean matches()
{
int rowValueId = column.getSingleValueRow(cursorOffset.getOffset());
return predicateMatchingValueIds.get(rowValueId);
}
};
}
@Override
public int getValueCardinality()
{
return column.getCardinality();
}
@Override
public String lookupName(int id)
{
final String value = column.lookupName(id);
return extractionFn == null ? value : extractionFn.apply(value);
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return true;
}
@Nullable
@Override
public IdLookup idLookup()
{
return extractionFn == null ? this : null;
}
@Override
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return column.lookupId(name);
}
}
return new SingleValueDimensionSelector();
return column.makeDimensionSelector(this, extractionFn);
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
@ -694,7 +525,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
final GenericColumn metricVals = cachedMetricVals;
return new FloatColumnSelector()
return new HistoricalFloatColumnSelector()
{
@Override
public float get()
@ -702,6 +533,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return metricVals.getFloatSingleValueRow(cursorOffset.getOffset());
}
@Override
public float get(int offset)
{
return metricVals.getFloatSingleValueRow(offset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
@ -922,18 +759,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
if (postFilter == null) {
return new QueryableIndexBaseCursor()
return new QueryableIndexBaseCursor<Offset>()
{
{
reset();
}
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advance()
{
@ -947,139 +778,39 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
cursorOffset.increment();
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
cursorOffset = initOffset.clone();
cursorOffsetHolder.set(cursorOffset);
}
};
} else {
return new QueryableIndexBaseCursor()
return new QueryableIndexBaseCursor<FilteredOffset>()
{
RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory(
cursorOffsetHolder,
descending
);
final ValueMatcher filterMatcher;
{
if (postFilter instanceof BooleanFilter) {
filterMatcher = ((BooleanFilter) postFilter).makeMatcher(
bitmapIndexSelector,
this,
rowOffsetMatcherFactory
);
} else {
if (postFilter.supportsBitmapIndex(bitmapIndexSelector)) {
filterMatcher = rowOffsetMatcherFactory.makeRowOffsetMatcher(postFilter.getBitmapIndex(
bitmapIndexSelector));
} else {
filterMatcher = postFilter.makeMatcher(this);
}
}
}
{
cursorOffset = new FilteredOffset(this, descending, postFilter, bitmapIndexSelector);
reset();
}
@Override
public DateTime getTime()
{
return myBucket;
}
@Override
public void advance()
{
BaseQuery.checkInterrupted();
cursorOffset.increment();
while (!isDone()) {
BaseQuery.checkInterrupted();
if (filterMatcher.matches()) {
return;
} else {
cursorOffset.increment();
}
}
cursorOffset.incrementInterruptibly();
}
@Override
public void advanceUninterruptibly()
{
if (Thread.currentThread().isInterrupted()) {
return;
if (!Thread.currentThread().isInterrupted()) {
cursorOffset.increment();
}
cursorOffset.increment();
while (!isDoneOrInterrupted()) {
if (filterMatcher.matches()) {
return;
} else {
cursorOffset.increment();
}
}
}
@Override
public void advanceTo(int offset)
{
int count = 0;
while (count < offset && !isDone()) {
advance();
count++;
}
}
@Override
public boolean isDone()
{
return !cursorOffset.withinBounds();
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
cursorOffset = initOffset.clone();
cursorOffsetHolder.set(cursorOffset);
if (!isDone()) {
if (filterMatcher.matches()) {
return;
} else {
advance();
}
}
cursorOffset.reset(initOffset.clone());
}
};
}
@ -1092,83 +823,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
public static class CursorOffsetHolder
{
Offset currOffset = null;
public Offset get()
{
return currOffset;
}
public void set(Offset currOffset)
{
this.currOffset = currOffset;
}
}
private static class CursorOffsetHolderRowOffsetMatcherFactory implements RowOffsetMatcherFactory
{
private final CursorOffsetHolder holder;
private final boolean descending;
public CursorOffsetHolderRowOffsetMatcherFactory(CursorOffsetHolder holder, boolean descending)
{
this.holder = holder;
this.descending = descending;
}
// Use an iterator-based implementation, ImmutableBitmap.get(index) works differently for Concise and Roaring.
// ImmutableConciseSet.get(index) is also inefficient, it performs a linear scan on each call
@Override
public ValueMatcher makeRowOffsetMatcher(final ImmutableBitmap rowBitmap)
{
final IntIterator iter = descending ?
BitmapOffset.getReverseBitmapOffsetIterator(rowBitmap) :
rowBitmap.iterator();
if (!iter.hasNext()) {
return BooleanValueMatcher.of(false);
}
if (descending) {
return new ValueMatcher()
{
int iterOffset = Integer.MAX_VALUE;
@Override
public boolean matches()
{
int currentOffset = holder.get().getOffset();
while (iterOffset > currentOffset && iter.hasNext()) {
iterOffset = iter.next();
}
return iterOffset == currentOffset;
}
};
} else {
return new ValueMatcher()
{
int iterOffset = -1;
@Override
public boolean matches()
{
int currentOffset = holder.get().getOffset();
while (iterOffset < currentOffset && iter.hasNext()) {
iterOffset = iter.next();
}
return iterOffset == currentOffset;
}
};
}
}
}
private abstract static class TimestampCheckingOffset implements Offset
public abstract static class TimestampCheckingOffset extends Offset
{
protected final Offset baseOffset;
protected final GenericColumn timestamps;
@ -1230,7 +885,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
private static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset
public static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset
{
public AscendingTimestampCheckingOffset(
Offset baseOffset,
@ -1262,7 +917,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
private static class DescendingTimestampCheckingOffset extends TimestampCheckingOffset
public static class DescendingTimestampCheckingOffset extends TimestampCheckingOffset
{
public DescendingTimestampCheckingOffset(
Offset baseOffset,
@ -1295,7 +950,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
}
private static class NoFilterOffset implements Offset
public static class NoFilterOffset extends Offset
{
private final int rowCount;
private final boolean descending;

View File

@ -31,7 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class SingleScanTimeDimSelector implements DimensionSelector
public class SingleScanTimeDimSelector implements SingleValueDimensionSelector
{
private final ExtractionFn extractionFn;
private final LongColumnSelector selector;
@ -64,6 +64,12 @@ public class SingleScanTimeDimSelector implements DimensionSelector
return new SingleIndexedInt(getDimensionValueIndex());
}
@Override
public int getRowValue()
{
return getDimensionValueIndex();
}
@Override
public ValueMatcher makeValueMatcher(final String value)
{
@ -74,6 +80,12 @@ public class SingleScanTimeDimSelector implements DimensionSelector
{
return Objects.equals(lookupName(getDimensionValueIndex()), value);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", SingleScanTimeDimSelector.this);
}
};
}
@ -87,6 +99,13 @@ public class SingleScanTimeDimSelector implements DimensionSelector
{
return predicate.apply(lookupName(getDimensionValueIndex()));
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", SingleScanTimeDimSelector.this);
inspector.visit("predicate", predicate);
}
};
}

View File

@ -17,48 +17,18 @@
* under the License.
*/
package io.druid.segment.data;
package io.druid.segment;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.util.List;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
/**
* Specialization for {@link DimensionSelector}s, always having a single value in {@link #getRow()}.
*/
public class ListBasedIndexedInts implements IndexedInts
public interface SingleValueDimensionSelector extends DimensionSelector
{
private final List<Integer> expansion;
public ListBasedIndexedInts(List<Integer> expansion) {this.expansion = expansion;}
@Override
public int size()
{
return expansion.size();
}
@Override
public int get(int index)
{
return expansion.get(index);
}
@Override
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}
@Override
public void fill(int index, int[] toFill)
{
throw new UnsupportedOperationException("fill not supported");
}
@Override
public void close() throws IOException
{
}
/**
* Returns a single value of {@link #getRow()}.
*/
@CalledFromHotLoop
int getRowValue();
}

View File

@ -327,6 +327,12 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
{
return IndexedIterable.create(this).iterator();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
};
}
@ -469,6 +475,12 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
};
} else {
return BooleanValueMatcher.of(false);
@ -506,6 +518,12 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
};
}

View File

@ -30,13 +30,14 @@ import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.spatial.ImmutableRTree;
import io.druid.collections.spatial.RTree;
import io.druid.collections.spatial.split.LinearGutmanSplitStrategy;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.ByteBufferUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnDescriptor;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.ArrayIndexed;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferWriter;
import io.druid.segment.data.CompressedObjectStrategy;
@ -49,7 +50,6 @@ import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIntsWriter;
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.data.VSizeIndexedIntsWriter;
import io.druid.segment.data.VSizeIndexedWriter;
import io.druid.segment.serde.DictionaryEncodedColumnPartSerde;
@ -64,14 +64,13 @@ import java.io.IOException;
import java.nio.IntBuffer;
import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
{
private static final Logger log = new Logger(StringDimensionMergerV9.class);
protected static final ListIndexed EMPTY_STR_DIM_VAL = new ListIndexed<>(Collections.singletonList(""), String.class);
protected static final Indexed<String> EMPTY_STR_DIM_VAL = new ArrayIndexed<>(new String[]{""}, String.class);
protected static final int[] EMPTY_STR_DIM_ARRAY = new int[]{0};
protected static final Splitter SPLITTER = Splitter.on(",");

View File

@ -20,8 +20,9 @@
package io.druid.segment;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.historical.HistoricalFloatColumnSelector;
public final class ZeroFloatColumnSelector implements FloatColumnSelector
public final class ZeroFloatColumnSelector implements HistoricalFloatColumnSelector
{
private static final ZeroFloatColumnSelector INSTANCE = new ZeroFloatColumnSelector();
@ -41,8 +42,15 @@ public final class ZeroFloatColumnSelector implements FloatColumnSelector
return 0.0f;
}
@Override
public float get(int offset)
{
return get();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -44,5 +44,6 @@ public final class ZeroLongColumnSelector implements LongColumnSelector
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -19,7 +19,10 @@
package io.druid.segment.column;
import io.druid.query.extraction.ExtractionFn;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.historical.OffsetHolder;
import java.io.Closeable;
@ -34,4 +37,6 @@ public interface DictionaryEncodedColumn<ActualType extends Comparable> extends
public ActualType lookupName(int id);
public int lookupId(ActualType name);
public int getCardinality();
DimensionSelector makeDimensionSelector(OffsetHolder offsetHolder, ExtractionFn extractionFn);
}

View File

@ -19,13 +19,27 @@
package io.druid.segment.column;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionSelectorUtils;
import io.druid.segment.IdLookup;
import io.druid.segment.data.CachingIndexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.SingleIndexedInt;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.historical.HistoricalDimensionSelector;
import io.druid.segment.historical.OffsetHolder;
import io.druid.segment.historical.SingleValueHistoricalDimensionSelector;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.BitSet;
/**
*/
@ -90,6 +104,184 @@ public class SimpleDictionaryEncodedColumn
return cachedLookups.size();
}
@Override
public HistoricalDimensionSelector makeDimensionSelector(
final OffsetHolder offsetHolder,
final ExtractionFn extractionFn
)
{
abstract class QueryableDimensionSelector implements HistoricalDimensionSelector, IdLookup
{
@Override
public int getValueCardinality()
{
return getCardinality();
}
@Override
public String lookupName(int id)
{
final String value = SimpleDictionaryEncodedColumn.this.lookupName(id);
return extractionFn == null ?
value :
extractionFn.apply(value);
}
@Override
public boolean nameLookupPossibleInAdvance()
{
return true;
}
@Nullable
@Override
public IdLookup idLookup()
{
return extractionFn == null ? this : null;
}
@Override
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return SimpleDictionaryEncodedColumn.this.lookupId(name);
}
}
if (hasMultipleValues()) {
class MultiValueDimensionSelector extends QueryableDimensionSelector
{
@Override
public IndexedInts getRow()
{
return multiValueColumn.get(offsetHolder.getOffset().getOffset());
}
@Override
public IndexedInts getRow(int offset)
{
return multiValueColumn.get(offset);
}
@Override
public ValueMatcher makeValueMatcher(String value)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, value);
}
@Override
public ValueMatcher makeValueMatcher(Predicate<String> predicate)
{
return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("multiValueColumn", multiValueColumn);
inspector.visit("offsetHolder", offsetHolder);
inspector.visit("offset", offsetHolder.getOffset());
inspector.visit("extractionFn", extractionFn);
}
}
return new MultiValueDimensionSelector();
} else {
class SingleValueQueryableDimensionSelector extends QueryableDimensionSelector
implements SingleValueHistoricalDimensionSelector
{
@Override
public IndexedInts getRow()
{
return new SingleIndexedInt(getRowValue());
}
@Override
public int getRowValue()
{
return column.get(offsetHolder.getOffset().getOffset());
}
@Override
public IndexedInts getRow(int offset)
{
return new SingleIndexedInt(getRowValue(offset));
}
@Override
public int getRowValue(int offset)
{
return column.get(offset);
}
@Override
public ValueMatcher makeValueMatcher(final String value)
{
if (extractionFn == null) {
final int valueId = lookupId(value);
if (valueId >= 0) {
return new ValueMatcher()
{
@Override
public boolean matches()
{
return getRowValue() == valueId;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", SimpleDictionaryEncodedColumn.this);
}
};
} else {
return BooleanValueMatcher.of(false);
}
} else {
// Employ precomputed BitSet optimization
return makeValueMatcher(Predicates.equalTo(value));
}
}
@Override
public ValueMatcher makeValueMatcher(final Predicate<String> predicate)
{
final BitSet predicateMatchingValueIds = DimensionSelectorUtils.makePredicateMatchingSet(
this,
predicate
);
return new ValueMatcher()
{
@Override
public boolean matches()
{
return predicateMatchingValueIds.get(getRowValue());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", SimpleDictionaryEncodedColumn.this);
}
};
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", column);
inspector.visit("offsetHolder", offsetHolder);
inspector.visit("offset", offsetHolder.getOffset());
inspector.visit("extractionFn", extractionFn);
}
}
return new SingleValueQueryableDimensionSelector();
}
}
@Override
public void close() throws IOException
{

View File

@ -20,6 +20,7 @@
package io.druid.segment.data;
import io.druid.java.util.common.IAE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
@ -90,6 +91,11 @@ public final class ArrayBasedIndexedInts implements IndexedInts
@Override
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -23,7 +23,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/**
*/
public class ArrayBasedOffset implements Offset
public class ArrayBasedOffset extends Offset
{
private final int[] ints;
private int currIndex;
@ -73,5 +73,6 @@ public class ArrayBasedOffset implements Offset
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -19,6 +19,8 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.util.Arrays;
import java.util.Iterator;
@ -67,4 +69,10 @@ public class ArrayIndexed<T> implements Indexed<T>
{
return Arrays.asList(baseArray).iterator();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -21,6 +21,7 @@ package io.druid.segment.data;
import com.google.common.collect.Ordering;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.IntIteratorUtils;
import it.unimi.dsi.fastutil.ints.IntIterator;
@ -96,6 +97,11 @@ public class BitmapCompressedIndexedInts implements IndexedInts, Comparable<Immu
@Override
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("immutableBitmap", immutableBitmap);
}
}

View File

@ -21,6 +21,7 @@ package io.druid.segment.data;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.io.Closeable;
import java.io.IOException;
@ -73,7 +74,7 @@ public class CachingIndexed<T> implements Indexed<T>, Closeable
@Override
public T get(int index)
{
if(cachedValues != null) {
if (cachedValues != null) {
final T cached = cachedValues.getValue(index);
if (cached != null) {
return cached;
@ -108,7 +109,14 @@ public class CachingIndexed<T> implements Indexed<T>, Closeable
}
}
private static class SizedLRUMap<K, V> extends LinkedHashMap<K, Pair<Integer, V>>
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("cachedValues", cachedValues != null);
inspector.visit("delegate", delegate);
}
private static class SizedLRUMap<K, V> extends LinkedHashMap<K, Pair<Integer, V>>
{
private final int maxBytes;
private int numBytes = 0;

View File

@ -27,6 +27,7 @@ import io.druid.collections.StupidResourceHolder;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.CompressedPools;
import it.unimi.dsi.fastutil.ints.IntIterator;
@ -367,5 +368,14 @@ public class CompressedIntsIndexedSupplier implements WritableSupplier<IndexedIn
{
Closeables.close(holder, false);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// ideally should inspect buffer, but at the moment of inspectRuntimeShape() call buffer is likely to be null,
// because loadBuffer() is not yet called, although during the processing it is not null, hence "visiting" null is
// not representative.
inspector.visit("singleThreadedIntBuffers", singleThreadedIntBuffers);
}
}
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.data;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.primitives.Ints;
@ -28,6 +29,7 @@ import io.druid.collections.StupidResourceHolder;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.CompressedPools;
import it.unimi.dsi.fastutil.ints.IntIterator;
@ -143,9 +145,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
baseBuffers.writeToChannel(channel);
}
/**
* For testing. Do not use unless you like things breaking
*/
@VisibleForTesting
GenericIndexed<ResourceHolder<ByteBuffer>> getBaseBuffers()
{
return baseBuffers;
@ -411,5 +411,14 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
{
Closeables.close(holder, false);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// ideally should inspect buffer and bigEndian, but at the moment of inspectRuntimeShape() call buffer is likely
// to be null and bigEndian = false, because loadBuffer() is not yet called, although during the processing buffer
// is not null, hence "visiting" null is not representative, and visiting bigEndian = false could be misleading.
inspector.visit("singleThreadedBuffers", singleThreadedBuffers);
}
}
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
@ -61,6 +62,11 @@ public class EmptyIndexedInts implements IndexedInts
@Override
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -27,6 +27,7 @@ import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import java.io.Closeable;
@ -166,7 +167,7 @@ public class GenericIndexed<T> implements Indexed<T>
private int logBaseTwoOfElementsPerValueFile;
private int relativeIndexMask;
private ByteBuffer theBuffer;
private final ByteBuffer theBuffer;
/**
* Constructor for version one.
@ -210,6 +211,7 @@ public class GenericIndexed<T> implements Indexed<T>
{
this.versionOne = false;
this.theBuffer = null;
this.strategy = strategy;
this.allowReverseLookup = allowReverseLookup;
this.valueBuffers = valueBuffs;
@ -347,6 +349,21 @@ public class GenericIndexed<T> implements Indexed<T>
return strategy.fromByteBuffer(copyValueBuffer, size);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("versionOne", versionOne);
inspector.visit("headerBuffer", headerBuffer);
if (versionOne) {
inspector.visit("firstValueBuffer", firstValueBuffer);
} else {
// Inspecting just one example of valueBuffer, not needed to inspect the whole array, because all buffers in it
// are the same.
inspector.visit("valueBuffer", valueBuffers.length > 0 ? valueBuffers[0] : null);
}
inspector.visit("strategy", strategy);
}
abstract class BufferIndexed implements Indexed<T>
{
int lastReadSize;
@ -525,6 +542,14 @@ public class GenericIndexed<T> implements Indexed<T>
}
return bufferedIndexedGet(copyBuffer, startOffset, endOffset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("headerBuffer", headerBuffer);
inspector.visit("copyBuffer", copyBuffer);
inspector.visit("strategy", strategy);
}
};
}
@ -633,6 +658,16 @@ public class GenericIndexed<T> implements Indexed<T>
int fileNum = index >> logBaseTwoOfElementsPerValueFile;
return bufferedIndexedGet(copyValueBuffers[fileNum], startOffset, endOffset);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("headerBuffer", headerBuffer);
// Inspecting just one example of copyValueBuffer, not needed to inspect the whole array, because all buffers
// in it are the same.
inspector.visit("copyValueBuffer", copyValueBuffers.length > 0 ? copyValueBuffers[0] : null);
inspector.visit("strategy", strategy);
}
};
}
}

View File

@ -19,12 +19,16 @@
package io.druid.segment.data;
public interface Indexed<T> extends Iterable<T>
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
public interface Indexed<T> extends Iterable<T>, HotLoopCallee
{
Class<? extends T> getClazz();
int size();
@CalledFromHotLoop
T get(int index);
/**

View File

@ -19,6 +19,8 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
import it.unimi.dsi.fastutil.ints.IntIterable;
import java.io.Closeable;
@ -26,9 +28,11 @@ import java.io.Closeable;
/**
* Get a int an index (array or list lookup abstraction without boxing).
*/
public interface IndexedInts extends IntIterable, Closeable
public interface IndexedInts extends IntIterable, Closeable, HotLoopCallee
{
@CalledFromHotLoop
int size();
@CalledFromHotLoop
int get(int index);
void fill(int index, int[] toFill);
}

View File

@ -23,4 +23,6 @@ import java.io.Closeable;
public interface IndexedMultivalue<T extends IndexedInts> extends Indexed<T>, Closeable
{
@Override
T get(int index);
}

View File

@ -23,7 +23,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/**
*/
public class IntersectingOffset implements Offset {
public class IntersectingOffset extends Offset {
private final Offset lhs;
private final Offset rhs;

View File

@ -19,6 +19,8 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.util.Iterator;
import java.util.List;
@ -67,4 +69,10 @@ public class ListIndexed<T> implements Indexed<T>
{
return baseList.iterator();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseList", baseList);
}
}

View File

@ -19,19 +19,35 @@
package io.druid.segment.data;
import io.druid.annotations.SubclassesMustBePublic;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
/**
* The "mutable" version of a ReadableOffset. Introduces "increment()" and "withinBounds()" methods, which are
* very similar to "next()" and "hasNext()" on the Iterator interface except increment() does not return a value.
*
* Annotated with {@link SubclassesMustBePublic} because Offset occurrences are replaced with a subclass in {@link
* io.druid.query.topn.Historical1SimpleDoubleAggPooledTopNScannerPrototype} and {@link
* io.druid.query.topn.HistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopNScannerPrototype} during
* specialization, and specialized version of those prototypes must be able to any subclass of Offset.
*/
public interface Offset extends ReadableOffset
@SubclassesMustBePublic
public abstract class Offset implements ReadableOffset, Cloneable
{
@CalledFromHotLoop
void increment();
public abstract void increment();
@CalledFromHotLoop
boolean withinBounds();
public abstract boolean withinBounds();
Offset clone();
@Override
public Offset clone()
{
try {
return (Offset) super.clone();
}
catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -20,6 +20,7 @@
package io.druid.segment.data;
import com.google.common.base.Preconditions;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
@ -86,6 +87,11 @@ public class RangeIndexedInts implements IndexedInts
@Override
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
@ -64,4 +65,10 @@ public final class SingleIndexedInt implements IndexedInts
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -23,7 +23,7 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/**
*/
public class UnioningOffset implements Offset
public class UnioningOffset extends Offset
{
private final Offset[] offsets = new Offset[2];
private final int[] offsetVals = new int[2];

View File

@ -24,6 +24,7 @@ import io.druid.common.utils.SerializerUtils;
import io.druid.io.ZeroCopyByteArrayOutputStream;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -190,6 +191,12 @@ public class VSizeIndexed implements IndexedMultivalue<IndexedInts>
// no-op
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("theBuffer", theBuffer);
}
public WritableSupplier<IndexedMultivalue<IndexedInts>> asWritableSupplier() {
return new VSizeIndexedSupplier(this);
}

View File

@ -22,6 +22,7 @@ package io.druid.segment.data;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.IAE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
@ -220,7 +221,12 @@ public class VSizeIndexedInts implements IndexedInts, Comparable<VSizeIndexedInt
@Override
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("buffer", buffer);
}
public WritableSupplier<IndexedInts> asWritableSupplier() {

View File

@ -19,6 +19,7 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
@ -70,6 +71,11 @@ public class ZeroIndexedInts implements IndexedInts
@Override
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -28,6 +28,7 @@ import io.druid.query.filter.BooleanFilter;
import io.druid.query.filter.Filter;
import io.druid.query.filter.RowOffsetMatcherFactory;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
@ -39,13 +40,13 @@ import java.util.List;
public class AndFilter implements BooleanFilter
{
private static final Joiner AND_JOINER = Joiner.on(" && ");
static final ValueMatcher[] EMPTY_VALUE_MATCHER_ARRAY = new ValueMatcher[0];
private final List<Filter> filters;
public AndFilter(
List<Filter> filters
)
public AndFilter(List<Filter> filters)
{
Preconditions.checkArgument(filters.size() > 0, "Can't construct empty AndFilter");
this.filters = filters;
}
@ -80,10 +81,6 @@ public class AndFilter implements BooleanFilter
@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
{
if (filters.size() == 0) {
return BooleanValueMatcher.of(false);
}
final ValueMatcher[] matchers = new ValueMatcher[filters.size()];
for (int i = 0; i < filters.size(); i++) {
@ -117,19 +114,7 @@ public class AndFilter implements BooleanFilter
matchers.add(0, offsetMatcher);
}
return new ValueMatcher()
{
@Override
public boolean matches()
{
for (ValueMatcher valueMatcher : matchers) {
if (!valueMatcher.matches()) {
return false;
}
}
return true;
}
};
return makeMatcher(matchers.toArray(EMPTY_VALUE_MATCHER_ARRAY));
}
@Override
@ -181,6 +166,7 @@ public class AndFilter implements BooleanFilter
private ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers)
{
Preconditions.checkState(baseMatchers.length > 0);
if (baseMatchers.length == 1) {
return baseMatchers[0];
}
@ -197,6 +183,15 @@ public class AndFilter implements BooleanFilter
}
return true;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("firstBaseMatcher", baseMatchers[0]);
inspector.visit("secondBaseMatcher", baseMatchers[1]);
// Don't inspect the 3rd and all consequent baseMatchers, cut runtime shape combinations at this point.
// Anyway if the filter is so complex, Hotspot won't inline all calls because of the inline limit.
}
};
}

View File

@ -29,6 +29,7 @@ import io.druid.query.filter.ValueGetter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherColumnSelectorStrategy;
import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionHandlerUtils;
@ -74,6 +75,9 @@ public class ColumnComparisonFilter implements Filter
}
public static ValueMatcher makeValueMatcher(final ValueGetter[] valueGetters) {
if (valueGetters.length == 0) {
return BooleanValueMatcher.of(true);
}
return new ValueMatcher()
{
@Override
@ -93,6 +97,13 @@ public class ColumnComparisonFilter implements Filter
}
return true;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// All value getters are likely the same or similar (in terms of runtime shape), so inspecting only one of them.
inspector.visit("oneValueGetter", valueGetters[0]);
}
};
}

View File

@ -20,6 +20,7 @@
package io.druid.segment.filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
final class FalseValueMatcher implements ValueMatcher
{
@ -39,4 +40,10 @@ final class FalseValueMatcher implements ValueMatcher
{
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -38,6 +38,7 @@ import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherColumnSelectorStrategy;
import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionHandlerUtils;
@ -453,6 +454,13 @@ public class Filters
{
return predicate.applyLong(longSelector.get());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("longSelector", longSelector);
inspector.visit("predicate", predicate);
}
};
}

View File

@ -23,6 +23,7 @@ import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
@ -60,6 +61,12 @@ public class NotFilter implements Filter
{
return !baseMatcher.matches();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseMatcher", baseMatcher);
}
};
}

View File

@ -20,6 +20,7 @@
package io.druid.segment.filter;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.query.filter.BitmapIndexSelector;
@ -27,6 +28,7 @@ import io.druid.query.filter.BooleanFilter;
import io.druid.query.filter.Filter;
import io.druid.query.filter.RowOffsetMatcherFactory;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelector;
import io.druid.segment.ColumnSelectorFactory;
@ -41,13 +43,9 @@ public class OrFilter implements BooleanFilter
private final List<Filter> filters;
public OrFilter(
List<Filter> filters
)
public OrFilter(List<Filter> filters)
{
if (filters.size() == 0) {
throw new IllegalArgumentException("Can't construct empty OrFilter (the universe does not exist)");
}
Preconditions.checkArgument(filters.size() > 0, "Can't construct empty OrFilter (the universe does not exist)");
this.filters = filters;
}
@ -103,23 +101,13 @@ public class OrFilter implements BooleanFilter
matchers.add(0, offsetMatcher);
}
return new ValueMatcher()
{
@Override
public boolean matches()
{
for (ValueMatcher valueMatcher : matchers) {
if (valueMatcher.matches()) {
return true;
}
}
return false;
}
};
return makeMatcher(matchers.toArray(AndFilter.EMPTY_VALUE_MATCHER_ARRAY));
}
private ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers){
Preconditions.checkState(baseMatchers.length > 0);
if (baseMatchers.length == 1) {
return baseMatchers[0];
}
@ -136,6 +124,15 @@ public class OrFilter implements BooleanFilter
}
return false;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("firstBaseMatcher", baseMatchers[0]);
inspector.visit("secondBaseMatcher", baseMatchers[1]);
// Don't inspect the 3rd and all consequent baseMatchers, cut runtime shape combinations at this point.
// Anyway if the filter is so complex, Hotspot won't inline all calls because of the inline limit.
}
};
}

View File

@ -20,6 +20,7 @@
package io.druid.segment.filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
final class TrueValueMatcher implements ValueMatcher
{
@ -39,4 +40,10 @@ final class TrueValueMatcher implements ValueMatcher
{
return true;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.historical;
import io.druid.segment.Cursor;
public interface HistoricalCursor extends Cursor, OffsetHolder
{
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.historical;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
/**
* Specialization for {@link DimensionSelector} queryable via offsets from {@link HistoricalCursor}.
*/
public interface HistoricalDimensionSelector extends DimensionSelector
{
@CalledFromHotLoop
IndexedInts getRow(int offset);
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.historical;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.segment.FloatColumnSelector;
public interface HistoricalFloatColumnSelector extends FloatColumnSelector
{
@CalledFromHotLoop
float get(int offset);
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.historical;
import io.druid.segment.data.Offset;
public interface OffsetHolder
{
Offset getOffset();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.historical;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.segment.SingleValueDimensionSelector;
public interface SingleValueHistoricalDimensionSelector
extends HistoricalDimensionSelector, SingleValueDimensionSelector
{
@CalledFromHotLoop
int getRowValue(int offset);
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionHandler;
import io.druid.segment.DimensionIndexer;
import io.druid.segment.IndexableAdapter;
@ -301,6 +302,12 @@ public class IncrementalIndexAdapter implements IndexableAdapter
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("bitmapIndex", bitmapIndex);
}
}
@Override

View File

@ -490,6 +490,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// nothing to inspect
}
}
return new TimeLongColumnSelector();

View File

@ -22,15 +22,17 @@ package io.druid.segment.virtual;
import com.google.common.base.Predicate;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IdLookup;
import io.druid.segment.SingleValueDimensionSelector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ZeroIndexedInts;
import javax.annotation.Nullable;
import java.util.Objects;
public abstract class BaseSingleValueDimensionSelector implements DimensionSelector
public abstract class BaseSingleValueDimensionSelector implements SingleValueDimensionSelector
{
@CalledFromHotLoop
protected abstract String getValue();
@ -41,6 +43,12 @@ public abstract class BaseSingleValueDimensionSelector implements DimensionSelec
return ZeroIndexedInts.instance();
}
@Override
public int getRowValue()
{
return 0;
}
@Override
public int getValueCardinality()
{
@ -63,6 +71,12 @@ public abstract class BaseSingleValueDimensionSelector implements DimensionSelec
{
return Objects.equals(getValue(), value);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", BaseSingleValueDimensionSelector.this);
}
};
}
@ -76,6 +90,13 @@ public abstract class BaseSingleValueDimensionSelector implements DimensionSelec
{
return predicate.apply(getValue());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", BaseSingleValueDimensionSelector.this);
inspector.visit("predicate", predicate);
}
};
}

View File

@ -130,6 +130,7 @@ public class ExpressionSelectors
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseSelector", baseSelector);
inspector.visit("extractionFn", extractionFn);
}
}
return new ExtractionExpressionDimensionSelector();

View File

@ -172,6 +172,7 @@ public class FilteredAggregatorTest
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Don't care about runtime shape in tests
}
}
);

View File

@ -157,7 +157,12 @@ public class CardinalityAggregatorTest
@Override
public void close() throws IOException
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Don't care about runtime shape in tests
}
};
}
@ -210,6 +215,7 @@ public class CardinalityAggregatorTest
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Don't care about runtime shape in tests
}
}

View File

@ -97,5 +97,6 @@ class TestDimensionSelector implements DimensionSelector
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Don't care about runtime shape in tests
}
}

View File

@ -34,6 +34,7 @@ public class StringRuntimeShapeTest
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Don't care about runtime shape in tests
}
}

View File

@ -28,9 +28,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import io.druid.collections.StupidPool;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
@ -76,7 +79,6 @@ import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.lookup.LookupExtractionFn;
import io.druid.query.ordering.StringComparators;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.segment.TestHelper;
import io.druid.segment.column.Column;
import io.druid.segment.column.ValueType;
@ -100,6 +102,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
*/
@ -111,13 +114,15 @@ public class TopNQueryRunnerTest
{
List<QueryRunner<Result<TopNResultValue>>> retVal = queryRunners();
List<Object[]> parameters = new ArrayList<>();
for (int i = 0; i < 8; i++) {
for (int i = 0; i < 32; i++) {
for (QueryRunner<Result<TopNResultValue>> firstParameter : retVal) {
Object[] params = new Object[4];
Object[] params = new Object[6];
params[0] = firstParameter;
params[1] = (i & 1) != 0;
params[2] = (i & 2) != 0;
params[3] = (i & 4) != 0;
params[4] = (i & 8) != 0;
params[5] = (i & 16) != 0;
parameters.add(params);
}
}
@ -174,12 +179,20 @@ public class TopNQueryRunnerTest
QueryRunner<Result<TopNResultValue>> runner,
boolean specializeGeneric1AggPooledTopN,
boolean specializeGeneric2AggPooledTopN,
boolean specializeHistorical1SimpleDoubleAggPooledTopN,
boolean specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN,
boolean duplicateSingleAggregatorQueries
)
{
this.runner = runner;
PooledTopNAlgorithm.specializeGeneric1AggPooledTopN = specializeGeneric1AggPooledTopN;
PooledTopNAlgorithm.specializeGeneric2AggPooledTopN = specializeGeneric2AggPooledTopN;
PooledTopNAlgorithm.setSpecializeGeneric1AggPooledTopN(specializeGeneric1AggPooledTopN);
PooledTopNAlgorithm.setSpecializeGeneric2AggPooledTopN(specializeGeneric2AggPooledTopN);
PooledTopNAlgorithm.setSpecializeHistorical1SimpleDoubleAggPooledTopN(
specializeHistorical1SimpleDoubleAggPooledTopN
);
PooledTopNAlgorithm.setSpecializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN(
specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN
);
this.duplicateSingleAggregatorQueries = duplicateSingleAggregatorQueries;
}
@ -1886,19 +1899,7 @@ public class TopNQueryRunnerTest
.build();
Granularity gran = Granularities.DAY;
TimeseriesQuery tsQuery = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(gran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexDoubleSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-01-12T00:00:00.000Z"),
@ -4966,79 +4967,98 @@ public class TopNQueryRunnerTest
@Test
public void testFullOnTopNWithAggsOnNumericDims()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index"),
new LongSumAggregatorFactory("qlLong", "qualityLong"),
new DoubleSumAggregatorFactory("qlFloat", "qualityLong"),
new DoubleSumAggregatorFactory("qfFloat", "qualityFloat"),
new LongSumAggregatorFactory("qfLong", "qualityFloat")
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<TopNResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "total_market")
.put("rows", 186L)
.put("index", 215679.82879638672D)
.put("addRowsIndexConstant", 215866.82879638672D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1743.9217529296875D)
.put("minIndex", 792.3260498046875D)
.put("qlLong", 279000L)
.put("qlFloat", 279000.0)
.put("qfFloat", 2790000.0)
.put("qfLong", 2790000L)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "upfront")
.put("rows", 186L)
.put("index", 192046.1060180664D)
.put("addRowsIndexConstant", 192233.1060180664D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
.put("maxIndex", 1870.06103515625D)
.put("minIndex", 545.9906005859375D)
.put("qlLong", 279000L)
.put("qlFloat", 279000.0)
.put("qfFloat", 2790000.0)
.put("qfLong", 2790000L)
.build(),
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "spot")
.put("rows", 837L)
.put("index", 95606.57232284546D)
.put("addRowsIndexConstant", 96444.57232284546D)
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
.put("maxIndex", 277.2735290527344D)
.put("minIndex", 59.02102279663086D)
.put("qlLong", 1171800L)
.put("qlFloat", 1171800.0)
.put("qfFloat", 11718000.0)
.put("qfLong", 11718000L)
.build()
)
)
)
List<Pair<AggregatorFactory, List<?>>> aggregations = new ArrayList<>();
aggregations.add(new Pair<>(
QueryRunnerTestHelper.rowsCount,
Longs.asList(186L, 186L, 837L)
));
Pair<AggregatorFactory, List<?>> indexAggregation = new Pair<>(
QueryRunnerTestHelper.indexDoubleSum,
Doubles.asList(215679.82879638672D, 192046.1060180664D, 95606.57232284546D)
);
assertExpectedResults(expectedResults, query);
aggregations.add(indexAggregation);
aggregations.add(new Pair<>(
QueryRunnerTestHelper.qualityUniques,
Doubles.asList(QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.UNIQUES_2, QueryRunnerTestHelper.UNIQUES_9)
));
aggregations.add(new Pair<>(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
Doubles.asList(1743.9217529296875D, 1870.06103515625D, 277.2735290527344D)
));
aggregations.add(new Pair<>(
new DoubleMinAggregatorFactory("minIndex", "index"),
Doubles.asList(792.3260498046875D, 545.9906005859375D, 59.02102279663086D)
));
aggregations.add(new Pair<>(
new LongSumAggregatorFactory("qlLong", "qualityLong"),
Longs.asList(279000L, 279000L, 1171800L)
));
aggregations.add(new Pair<>(
new DoubleSumAggregatorFactory("qlFloat", "qualityLong"),
Doubles.asList(279000.0, 279000.0, 1171800.0)
));
aggregations.add(new Pair<>(
new DoubleSumAggregatorFactory("qfFloat", "qualityFloat"),
Doubles.asList(2790000.0, 2790000.0, 11718000.0)
));
aggregations.add(new Pair<>(
new LongSumAggregatorFactory("qfLong", "qualityFloat"),
Longs.asList(2790000L, 2790000L, 11718000L)
));
List<List<Pair<AggregatorFactory, List<?>>>> aggregationCombinations = new ArrayList<>();
for (Pair<AggregatorFactory, List<?>> aggregation : aggregations) {
aggregationCombinations.add(Collections.singletonList(aggregation));
}
aggregationCombinations.add(aggregations);
for (List<Pair<AggregatorFactory, List<?>>> aggregationCombination : aggregationCombinations) {
boolean hasIndexAggregator = aggregationCombination.stream().anyMatch(agg -> "index".equals(agg.lhs.getName()));
boolean hasRowsAggregator = aggregationCombination.stream().anyMatch(agg -> "rows".equals(agg.lhs.getName()));
TopNQueryBuilder queryBuilder = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(QueryRunnerTestHelper.marketDimension)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(aggregationCombination.stream().map(agg -> agg.lhs).collect(Collectors.toList()));
String metric;
if (hasIndexAggregator) {
metric = "index";
} else {
metric = aggregationCombination.get(0).lhs.getName();
}
queryBuilder.metric(metric);
if (hasIndexAggregator && hasRowsAggregator) {
queryBuilder.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant));
}
TopNQuery query = queryBuilder.build();
ImmutableMap.Builder<String, Object> row1 = ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "total_market");
ImmutableMap.Builder<String, Object> row2 = ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "upfront");
ImmutableMap.Builder<String, Object> row3 = ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.marketDimension, "spot");
if (hasIndexAggregator && hasRowsAggregator) {
row1.put("addRowsIndexConstant", 215866.82879638672D);
row2.put("addRowsIndexConstant", 192233.1060180664D);
row3.put("addRowsIndexConstant", 96444.57232284546D);
}
aggregationCombination.forEach(agg -> {
row1.put(agg.lhs.getName(), agg.rhs.get(0));
row2.put(agg.lhs.getName(), agg.rhs.get(1));
row3.put(agg.lhs.getName(), agg.rhs.get(2));
});
List<ImmutableMap<String, Object>> rows = Lists.newArrayList(row1.build(), row2.build(), row3.build());
rows.sort((r1, r2) -> ((Comparable) r2.get(metric)).compareTo(r1.get(metric)));
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(rows)
)
);
assertExpectedResults(expectedResults, query);
}
}
}

View File

@ -23,11 +23,9 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
public abstract class TestFloatColumnSelector implements FloatColumnSelector
{
/**
* Don't care about runtime shape in tests
*/
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Don't care about runtime shape in tests
}
}

View File

@ -23,11 +23,9 @@ import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
public abstract class TestLongColumnSelector implements LongColumnSelector
{
/**
* Don't care about runtime shape in tests
*/
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Don't care about runtime shape in tests
}
}

View File

@ -42,7 +42,8 @@ public class IndexedIntsTest
{
return Arrays.asList(
new Object[][]{
{VSizeIndexedInts.fromArray(array)}
{VSizeIndexedInts.fromArray(array)},
{ArrayBasedIndexedInts.of(array)}
}
);
}

View File

@ -46,15 +46,13 @@ import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import io.druid.segment.data.ZeroIndexedInts;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
@ -280,38 +278,7 @@ public class VirtualColumnsTest
@Override
public IndexedInts getRow()
{
return new IndexedInts()
{
@Override
public int size()
{
return 1;
}
@Override
public int get(int index)
{
return 0;
}
@Override
public IntIterator iterator()
{
return IntIterators.singleton(0);
}
@Override
public void fill(int index, int[] toFill)
{
throw new UnsupportedOperationException("fill not supported");
}
@Override
public void close() throws IOException
{
}
};
return ZeroIndexedInts.instance();
}
@Override
@ -362,6 +329,7 @@ public class VirtualColumnsTest
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
// Don't care about runtime shape in tests
}
};