Monomorphic processing of TopN queries with 1 and 2 aggregators (key part of #3798) (#3889)

* Monomorphic processing: add HotLoopCallee, CalledFromHotLoop, RuntimeShapeInspector, SpecializationService. Specialize topN queries with 1 or 2 aggregators. Add Cursor.advanceUninterruptibly() and isDoneOrInterrupted() for exception-free query processing.

* Use Execs.singleThreaded()

* RuntimeShapeInspector to support nullable fields

* Make CalledFromHotLoop annotation Inherited

* Remove unnecessary conversion of array of ColumnSelectorPluses to list and back to array in CardinalityAggregatorFactory

* Close InputStream in SpecializationService

* Formatting

* Test specialized PooledTopNScanners

* Set flags in PooledTopNAlgorithm directly

* Fix tests, dependent on CountAggragatorFactory toString() form

* Fix

* Revert CountAggregatorFactory changes

* Implement inspectRuntimeShape() for LongWrappingDimensionSelector and FloatWrappingDimensionSelector

* Remove duplicate RoaringBitmap dependency in the extendedset pom.xml

* Fix

* Treat ByteBuffers specially in StringRuntimeShape

* Doc fix

* Annotate BufferAggregator.init() with CalledFromHotLoop

* Make triggerSpecializationIterationsThreshold an int

* Remove SpecializationService.PerPrototypeClassState.of()

* Add comments

* Limit the amount of specializations that SpecializationService could make

* Add default implementation for BufferAggregator.inspectRuntimeShape(), for compatibility with extensions

* Use more efficient ConcurrentMap's idioms in SpecializationService
This commit is contained in:
Roman Leventov 2017-03-17 13:44:36 -06:00 committed by Himanshu
parent 3ec1877887
commit 84fe91ba0b
104 changed files with 2497 additions and 580 deletions

View File

@ -83,7 +83,7 @@ public class DistinctCountAggregatorFactory extends AggregatorFactory
{
DimensionSelector selector = makeDimensionSelector(columnFactory);
if (selector == null) {
return new EmptyDistinctCountBufferAggregator();
return EmptyDistinctCountBufferAggregator.instance();
} else {
return new DistinctCountBufferAggregator(makeDimensionSelector(columnFactory));
}

View File

@ -22,16 +22,17 @@ package io.druid.query.aggregation.distinctcount;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.bitmap.WrappedRoaringBitmap;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionSelector;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class DistinctCountBufferAggregator implements BufferAggregator
{
private final DimensionSelector selector;
private final Map<Integer, MutableBitmap> mutableBitmapCollection = new HashMap<>();
private final Int2ObjectMap<MutableBitmap> mutableBitmapCollection = new Int2ObjectOpenHashMap<>();
public DistinctCountBufferAggregator(
DimensionSelector selector
@ -89,4 +90,10 @@ public class DistinctCountBufferAggregator implements BufferAggregator
{
mutableBitmapCollection.clear();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -20,13 +20,24 @@
package io.druid.query.aggregation.distinctcount;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
public class EmptyDistinctCountBufferAggregator implements BufferAggregator
/**
* The difference from {@link io.druid.query.aggregation.NoopBufferAggregator} is that
* EmptyDistinctCountBufferAggregator returns 0 instead of null from {@link #get(ByteBuffer, int)}.
*/
public final class EmptyDistinctCountBufferAggregator implements BufferAggregator
{
private static final EmptyDistinctCountBufferAggregator INSTANCE = new EmptyDistinctCountBufferAggregator();
public EmptyDistinctCountBufferAggregator()
static EmptyDistinctCountBufferAggregator instance()
{
return INSTANCE;
}
private EmptyDistinctCountBufferAggregator()
{
}
@ -62,4 +73,9 @@ public class EmptyDistinctCountBufferAggregator implements BufferAggregator
public void close()
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
@ -79,7 +80,14 @@ public class TimestampBufferAggregator implements BufferAggregator
}
@Override
public void close() {
public void close()
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("comparator", comparator);
}
}

View File

@ -20,12 +20,20 @@
package io.druid.query.aggregation.datasketches.theta;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
public class EmptySketchBufferAggregator implements BufferAggregator
public final class EmptySketchBufferAggregator implements BufferAggregator
{
public EmptySketchBufferAggregator()
private static final EmptySketchBufferAggregator INSTANCE = new EmptySketchBufferAggregator();
public static EmptySketchBufferAggregator instance()
{
return INSTANCE;
}
private EmptySketchBufferAggregator()
{
}
@ -61,4 +69,9 @@ public class EmptySketchBufferAggregator implements BufferAggregator
public void close()
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -73,7 +73,7 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return new EmptySketchBufferAggregator();
return EmptySketchBufferAggregator.instance();
} else {
return new SketchBufferAggregator(selector, size, getMaxIntermediateSize());
}

View File

@ -26,6 +26,7 @@ import com.yahoo.sketches.Family;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Union;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
@ -113,4 +114,9 @@ public class SketchBufferAggregator implements BufferAggregator
unions.clear();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation.histogram;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
@ -99,4 +100,10 @@ public class ApproximateHistogramBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation.histogram;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
@ -102,4 +103,10 @@ public class ApproximateHistogramFoldingBufferAggregator implements BufferAggreg
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -23,14 +23,14 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
import io.druid.query.aggregation.NoopBufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import org.apache.commons.codec.binary.Base64;
@ -95,7 +95,7 @@ public class VarianceAggregatorFactory extends AggregatorFactory
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return Aggregators.noopAggregator();
return NoopAggregator.instance();
}
if ("float".equalsIgnoreCase(inputType)) {
@ -115,7 +115,7 @@ public class VarianceAggregatorFactory extends AggregatorFactory
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return Aggregators.noopBufferAggregator();
return NoopBufferAggregator.instance();
}
if ("float".equalsIgnoreCase(inputType)) {
return new VarianceBufferAggregator.FloatVarianceAggregator(

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.variance;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
@ -102,6 +103,12 @@ public abstract class VarianceBufferAggregator implements BufferAggregator
buf.putDouble(position + NVARIANCE_OFFSET, variance);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
public static final class LongVarianceAggregator extends VarianceBufferAggregator
@ -128,6 +135,12 @@ public abstract class VarianceBufferAggregator implements BufferAggregator
buf.putDouble(position + NVARIANCE_OFFSET, variance);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
public static final class ObjectVarianceAggregator extends VarianceBufferAggregator
@ -167,5 +180,11 @@ public abstract class VarianceBufferAggregator implements BufferAggregator
buf.putDouble(position + SUM_OFFSET, sum);
buf.putDouble(position + NVARIANCE_OFFSET, nvariance);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}
}

View File

@ -20,10 +20,9 @@
package io.druid.query.aggregation.variance;
import com.google.common.collect.Lists;
import io.druid.java.util.common.Pair;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TestFloatColumnSelector;
import org.junit.Assert;
import org.junit.Test;
@ -142,7 +141,7 @@ public class VarianceAggregatorCollectorTest
}
}
private static class FloatHandOver implements FloatColumnSelector
private static class FloatHandOver extends TestFloatColumnSelector
{
float v;

View File

@ -54,7 +54,7 @@ public class VarianceTopNQueryTest
@Parameterized.Parameters(name="{0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return TopNQueryRunnerTest.constructorFeeder();
return QueryRunnerTestHelper.transformToConstructionFeeder(TopNQueryRunnerTest.queryRunners());
}
private final QueryRunner runner;

13
pom.xml
View File

@ -638,6 +638,16 @@
<artifactId>RoaringBitmap</artifactId>
<version>0.5.18</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>5.2</version>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-commons</artifactId>
<version>5.2</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
@ -815,10 +825,11 @@
<version>1.0</version>
</signature>
<ignores>
<!-- Some of our code uses DirectBuffer & Cleaner directly, which are not part of
<!-- Some of our code uses sun.* classes directly, which are not part of
the JDK signature (although they are there anyway). -->
<ignore>sun.nio.ch.DirectBuffer</ignore>
<ignore>sun.misc.Cleaner</ignore>
<ignore>sun.misc.Unsafe</ignore>
</ignores>
</configuration>
</execution>

View File

@ -93,6 +93,14 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm-commons</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -97,6 +97,13 @@ public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
}
}
public static void checkInterrupted()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
}
}
public static final String QUERYID = "queryId";
private final DataSource dataSource;
private final boolean descending;

View File

@ -1,112 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import java.nio.ByteBuffer;
/**
*/
public class Aggregators
{
public static Aggregator noopAggregator()
{
return new Aggregator()
{
@Override
public void aggregate()
{
}
@Override
public void reset()
{
}
@Override
public Object get()
{
return null;
}
@Override
public float getFloat()
{
return 0;
}
@Override
public void close()
{
}
@Override
public long getLong()
{
return 0;
}
};
}
public static BufferAggregator noopBufferAggregator()
{
return new BufferAggregator()
{
@Override
public void init(ByteBuffer buf, int position)
{
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
}
@Override
public Object get(ByteBuffer buf, int position)
{
return null;
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return 0;
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return 0L;
}
@Override
public void close()
{
}
};
}
}

View File

@ -19,6 +19,10 @@
package io.druid.query.aggregation;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
/**
@ -29,7 +33,7 @@ import java.nio.ByteBuffer;
* Thus, an Aggregator can be thought of as a closure over some other thing that is stateful and changes between calls
* to aggregate(...).
*/
public interface BufferAggregator
public interface BufferAggregator extends HotLoopCallee
{
/**
* Initializes the buffer location
@ -44,6 +48,7 @@ public interface BufferAggregator
* @param buf byte buffer to initialize
* @param position offset within the byte buffer for initialization
*/
@CalledFromHotLoop
void init(ByteBuffer buf, int position);
/**
@ -57,6 +62,7 @@ public interface BufferAggregator
* @param buf byte buffer storing the byte array representation of the aggregate
* @param position offset within the byte buffer at which the current aggregate value is stored
*/
@CalledFromHotLoop
void aggregate(ByteBuffer buf, int position);
/**
@ -110,4 +116,14 @@ public interface BufferAggregator
* Release any resources used by the aggregator
*/
void close();
/**
* {@inheritDoc}
*
* <p>The default implementation inspects nothing. Classes that implement {@code BufferAggregator} are encouraged to
* override this method, following the specification of {@link HotLoopCallee#inspectRuntimeShape}.
*/
default void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -19,6 +19,8 @@
package io.druid.query.aggregation;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
/**
@ -62,4 +64,9 @@ public class CountBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
public abstract class DoubleBufferAggregator implements BufferAggregator
{
protected final FloatColumnSelector selector;
DoubleBufferAggregator(FloatColumnSelector selector)
{
this.selector = selector;
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getDouble(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override
public void close()
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -25,13 +25,12 @@ import java.nio.ByteBuffer;
/**
*/
public class DoubleMaxBufferAggregator implements BufferAggregator
public class DoubleMaxBufferAggregator extends DoubleBufferAggregator
{
private final FloatColumnSelector selector;
public DoubleMaxBufferAggregator(FloatColumnSelector selector)
DoubleMaxBufferAggregator(FloatColumnSelector selector)
{
this.selector = selector;
super(selector);
}
@Override
@ -45,28 +44,4 @@ public class DoubleMaxBufferAggregator implements BufferAggregator
{
buf.putDouble(position, Math.max(buf.getDouble(position), (double) selector.get()));
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getDouble(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -25,13 +25,12 @@ import java.nio.ByteBuffer;
/**
*/
public class DoubleMinBufferAggregator implements BufferAggregator
public class DoubleMinBufferAggregator extends DoubleBufferAggregator
{
private final FloatColumnSelector selector;
public DoubleMinBufferAggregator(FloatColumnSelector selector)
DoubleMinBufferAggregator(FloatColumnSelector selector)
{
this.selector = selector;
super(selector);
}
@Override
@ -45,29 +44,4 @@ public class DoubleMinBufferAggregator implements BufferAggregator
{
buf.putDouble(position, Math.min(buf.getDouble(position), (double) selector.get()));
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getDouble(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -25,15 +25,12 @@ import java.nio.ByteBuffer;
/**
*/
public class DoubleSumBufferAggregator implements BufferAggregator
public class DoubleSumBufferAggregator extends DoubleBufferAggregator
{
private final FloatColumnSelector selector;
public DoubleSumBufferAggregator(
FloatColumnSelector selector
)
DoubleSumBufferAggregator(FloatColumnSelector selector)
{
this.selector = selector;
super(selector);
}
@Override
@ -47,29 +44,4 @@ public class DoubleSumBufferAggregator implements BufferAggregator
{
buf.putDouble(position, buf.getDouble(position) + (double) selector.get());
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getDouble(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getDouble(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) buf.getDouble(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
@ -71,4 +72,11 @@ public class FilteredBufferAggregator implements BufferAggregator
{
delegate.close();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("matcher", matcher);
inspector.visit("delegate", delegate);
}
}

View File

@ -21,6 +21,7 @@ package io.druid.query.aggregation;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Longs;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import java.nio.ByteBuffer;
@ -105,4 +106,10 @@ public class HistogramBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -20,6 +20,7 @@
package io.druid.query.aggregation;
import com.google.common.collect.Lists;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
@ -74,4 +75,11 @@ public class JavaScriptBufferAggregator implements BufferAggregator
public void close() {
script.close();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selectorList", selectorList);
inspector.visit("script", script);
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
public abstract class LongBufferAggregator implements BufferAggregator
{
protected final LongColumnSelector selector;
LongBufferAggregator(LongColumnSelector selector)
{
this.selector = selector;
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -25,13 +25,12 @@ import java.nio.ByteBuffer;
/**
*/
public class LongMaxBufferAggregator implements BufferAggregator
public class LongMaxBufferAggregator extends LongBufferAggregator
{
private final LongColumnSelector selector;
public LongMaxBufferAggregator(LongColumnSelector selector)
LongMaxBufferAggregator(LongColumnSelector selector)
{
this.selector = selector;
super(selector);
}
@Override
@ -45,28 +44,4 @@ public class LongMaxBufferAggregator implements BufferAggregator
{
buf.putLong(position, Math.max(buf.getLong(position), selector.get()));
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -25,13 +25,12 @@ import java.nio.ByteBuffer;
/**
*/
public class LongMinBufferAggregator implements BufferAggregator
public class LongMinBufferAggregator extends LongBufferAggregator
{
private final LongColumnSelector selector;
public LongMinBufferAggregator(LongColumnSelector selector)
LongMinBufferAggregator(LongColumnSelector selector)
{
this.selector = selector;
super(selector);
}
@Override
@ -45,29 +44,4 @@ public class LongMinBufferAggregator implements BufferAggregator
{
buf.putLong(position, Math.min(buf.getLong(position), selector.get()));
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -25,15 +25,12 @@ import java.nio.ByteBuffer;
/**
*/
public class LongSumBufferAggregator implements BufferAggregator
public class LongSumBufferAggregator extends LongBufferAggregator
{
private final LongColumnSelector selector;
public LongSumBufferAggregator(
LongColumnSelector selector
)
LongSumBufferAggregator(LongColumnSelector selector)
{
this.selector = selector;
super(selector);
}
@Override
@ -47,29 +44,4 @@ public class LongSumBufferAggregator implements BufferAggregator
{
buf.putLong(position, buf.getLong(position) + selector.get());
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{
// no resources to cleanup
}
}

View File

@ -17,47 +17,51 @@
* under the License.
*/
package io.druid.segment;
package io.druid.query.aggregation;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.Offset;
/**
*/
class IndexedIntsOffset implements Offset
public final class NoopAggregator implements Aggregator
{
int currRow;
private final IndexedInts invertedIndex;
private static final NoopAggregator INSTANCE = new NoopAggregator();
public IndexedIntsOffset(IndexedInts invertedIndex)
public static NoopAggregator instance()
{
return INSTANCE;
}
private NoopAggregator()
{
this.invertedIndex = invertedIndex;
currRow = 0;
}
@Override
public void increment()
public void aggregate()
{
++currRow;
}
@Override
public boolean withinBounds()
public void reset()
{
return currRow < invertedIndex.size();
}
@Override
public Offset clone()
public Object get()
{
final IndexedIntsOffset retVal = new IndexedIntsOffset(invertedIndex);
retVal.currRow = currRow;
return retVal;
return null;
}
@Override
public int getOffset()
public float getFloat()
{
return invertedIndex.get(currRow);
return 0;
}
@Override
public void close()
{
}
@Override
public long getLong()
{
return 0;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
public final class NoopBufferAggregator implements BufferAggregator
{
private static final NoopBufferAggregator INSTANCE = new NoopBufferAggregator();
public static NoopBufferAggregator instance()
{
return INSTANCE;
}
private NoopBufferAggregator()
{
}
@Override
public void init(ByteBuffer buf, int position)
{
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
}
@Override
public Object get(ByteBuffer buf, int position)
{
return null;
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return 0;
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return 0L;
}
@Override
public void close()
{
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -19,12 +19,13 @@
package io.druid.query.aggregation.cardinality;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import java.util.List;
@ -32,48 +33,59 @@ import java.util.List;
public class CardinalityAggregator implements Aggregator
{
private final String name;
private final List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList;
private final ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses;
private final boolean byRow;
public static final HashFunction hashFn = Hashing.murmur3_128();
protected static void hashRow(
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList,
static void hashRow(
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses,
HyperLogLogCollector collector
)
{
final Hasher hasher = hashFn.newHasher();
for (int k = 0; k < selectorPlusList.size(); ++k) {
for (int k = 0; k < selectorPluses.length; ++k) {
if (k != 0) {
hasher.putByte((byte) 0);
}
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> selectorPlus = selectorPlusList.get(k);
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> selectorPlus = selectorPluses[k];
selectorPlus.getColumnSelectorStrategy().hashRow(selectorPlus.getSelector(), hasher);
}
collector.add(hasher.hash().asBytes());
}
protected static void hashValues(
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList,
static void hashValues(
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses,
HyperLogLogCollector collector
)
{
for (final ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> selectorPlus : selectorPlusList) {
for (final ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> selectorPlus : selectorPluses) {
selectorPlus.getColumnSelectorStrategy().hashValues(selectorPlus.getSelector(), collector);
}
}
private HyperLogLogCollector collector;
public CardinalityAggregator(
@VisibleForTesting
@SuppressWarnings("unchecked")
CardinalityAggregator(
String name,
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList,
boolean byRow
)
{
this(name, selectorPlusList.toArray(new ColumnSelectorPlus[] {}), byRow);
}
CardinalityAggregator(
String name,
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses,
boolean byRow
)
{
this.name = name;
this.selectorPlusList = selectorPlusList;
this.selectorPluses = selectorPluses;
this.collector = HyperLogLogCollector.makeLatestCollector();
this.byRow = byRow;
}
@ -82,9 +94,9 @@ public class CardinalityAggregator implements Aggregator
public void aggregate()
{
if (byRow) {
hashRow(selectorPlusList, collector);
hashRow(selectorPluses, collector);
} else {
hashValues(selectorPlusList, collector);
hashValues(selectorPluses, collector);
}
}
@ -115,7 +127,7 @@ public class CardinalityAggregator implements Aggregator
@Override
public Aggregator clone()
{
return new CardinalityAggregator(name, selectorPlusList, byRow);
return new CardinalityAggregator(name, selectorPluses, byRow);
}
@Override

View File

@ -31,8 +31,9 @@ import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
import io.druid.query.aggregation.NoopBufferAggregator;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -44,7 +45,6 @@ import org.apache.commons.codec.binary.Base64;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@ -137,36 +137,34 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(final ColumnSelectorFactory columnFactory)
{
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList =
Arrays.asList(DimensionHandlerUtils.createColumnSelectorPluses(
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses =
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
fields,
columnFactory
));
);
if (selectorPlusList.isEmpty()) {
return Aggregators.noopAggregator();
if (selectorPluses.length == 0) {
return NoopAggregator.instance();
}
return new CardinalityAggregator(name, selectorPlusList, byRow);
return new CardinalityAggregator(name, selectorPluses, byRow);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
{
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList =
Arrays.asList(DimensionHandlerUtils.createColumnSelectorPluses(
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses =
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
fields,
columnFactory
));
);
if (selectorPlusList.isEmpty()) {
return Aggregators.noopBufferAggregator();
if (selectorPluses.length == 0) {
return NoopBufferAggregator.instance();
}
return new CardinalityBufferAggregator(selectorPlusList, byRow);
return new CardinalityBufferAggregator(selectorPluses, byRow);
}
@Override

View File

@ -20,26 +20,26 @@
package io.druid.query.aggregation.cardinality;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import java.nio.ByteBuffer;
import java.util.List;
public class CardinalityBufferAggregator implements BufferAggregator
{
private final List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList;
private final ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses;
private final boolean byRow;
private static final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray();
public CardinalityBufferAggregator(
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList,
CardinalityBufferAggregator(
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses,
boolean byRow
)
{
this.selectorPlusList = selectorPlusList;
this.selectorPluses = selectorPluses;
this.byRow = byRow;
}
@ -63,9 +63,9 @@ public class CardinalityBufferAggregator implements BufferAggregator
try {
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
if (byRow) {
CardinalityAggregator.hashRow(selectorPlusList, collector);
CardinalityAggregator.hashRow(selectorPluses, collector);
} else {
CardinalityAggregator.hashValues(selectorPlusList, collector);
CardinalityAggregator.hashValues(selectorPluses, collector);
}
}
finally {
@ -102,4 +102,10 @@ public class CardinalityBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selectorPluses", selectorPluses);
}
}

View File

@ -30,6 +30,7 @@ import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
@ -148,6 +149,12 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
buf.putDouble(position + Longs.BYTES, pair.rhs);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
};

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.first;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
@ -79,4 +80,11 @@ public class DoubleFirstBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("timeSelector", timeSelector);
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -29,6 +29,7 @@ import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
@ -138,6 +139,12 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
buf.putLong(position + Longs.BYTES, pair.rhs);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
};

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.first;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
@ -78,4 +79,11 @@ public class LongFirstBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("timeSelector", timeSelector);
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -28,8 +28,9 @@ import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.NoopAggregator;
import io.druid.query.aggregation.NoopBufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import org.apache.commons.codec.binary.Base64;
@ -85,7 +86,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return Aggregators.noopAggregator();
return NoopAggregator.instance();
}
final Class classOfObject = selector.classOfObject();
@ -104,7 +105,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return Aggregators.noopBufferAggregator();
return NoopBufferAggregator.instance();
}
final Class classOfObject = selector.classOfObject();

View File

@ -21,6 +21,7 @@ package io.druid.query.aggregation.hyperloglog;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
@ -102,4 +103,10 @@ public class HyperUniquesBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

View File

@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
@ -131,6 +132,12 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
buf.putDouble(position + Longs.BYTES, pair.rhs);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
};

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.last;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
@ -79,4 +80,11 @@ public class DoubleLastBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("timeSelector", timeSelector);
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -31,6 +31,7 @@ import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
@ -131,6 +132,12 @@ public class LongLastAggregatorFactory extends AggregatorFactory
buf.putLong(position + Longs.BYTES, pair.rhs);
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
};
}
};

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.last;
import com.google.common.primitives.Longs;
import io.druid.collections.SerializablePair;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.LongColumnSelector;
import java.nio.ByteBuffer;
@ -78,4 +79,11 @@ public class LongLastBufferAggregator implements BufferAggregator
{
// no resources to cleanup
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("timeSelector", timeSelector);
inspector.visit("valueSelector", valueSelector);
}
}

View File

@ -24,6 +24,7 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import io.druid.java.util.common.IAE;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionSelectorUtils;
import io.druid.segment.IdLookup;
@ -173,4 +174,11 @@ final class ForwardingFilteredDimensionSelector implements DimensionSelector, Id
{
return forwardMapping.get(baseIdLookup.lookupId(name));
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("forwardMapping", forwardMapping);
}
}

View File

@ -21,6 +21,7 @@ package io.druid.query.dimension;
import com.google.common.base.Predicate;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IdLookup;
import io.druid.segment.data.ArrayBasedIndexedInts;
@ -132,4 +133,11 @@ final class PredicateFilteredDimensionSelector implements DimensionSelector
{
return selector.idLookup();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("predicate", predicate);
}
}

View File

@ -27,6 +27,7 @@ import io.druid.data.input.Row;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
@ -164,6 +165,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
{
return null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
inspector.visit("extractionFn", extractionFn);
}
};
} else {
return new DimensionSelector()
@ -289,6 +297,13 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
{
return null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
inspector.visit("extractionFn", extractionFn);
}
};
}
}
@ -296,17 +311,26 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
abstract class RowBasedFloatColumnSelector implements FloatColumnSelector
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
}
}
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new FloatColumnSelector()
class TimeFloatColumnSelector extends RowBasedFloatColumnSelector
{
@Override
public float get()
{
return (float) row.get().getTimestampFromEpoch();
}
};
}
return new TimeFloatColumnSelector();
} else {
return new FloatColumnSelector()
return new RowBasedFloatColumnSelector()
{
@Override
public float get()
@ -320,17 +344,26 @@ public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
abstract class RowBasedLongColumnSelector implements LongColumnSelector
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("row", row);
}
}
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new LongColumnSelector()
class TimeLongColumnSelector extends RowBasedLongColumnSelector
{
@Override
public long get()
{
return row.get().getTimestampFromEpoch();
}
};
}
return new TimeLongColumnSelector();
} else {
return new LongColumnSelector()
return new RowBasedLongColumnSelector()
{
@Override
public long get()

View File

@ -38,7 +38,7 @@ import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.granularity.AllGranularity;
import io.druid.java.util.common.guava.Accumulator;
import io.druid.query.QueryInterruptedException;
import io.druid.query.BaseQuery;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
@ -152,9 +152,7 @@ public class RowBasedGrouperHelper
final Row row
)
{
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
}
BaseQuery.checkInterrupted();
if (theGrouper == null) {
// Pass-through null returns without doing more work.

View File

@ -28,7 +28,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.query.QueryInterruptedException;
import io.druid.query.BaseQuery;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.ColumnSelectorFactory;
import net.jpountz.lz4.LZ4BlockInputStream;
@ -200,9 +200,7 @@ public class SpillingGrouper<KeyType> implements Grouper<KeyType>
outFile = out.getFile();
final Iterator<Entry<KeyType>> it = grouper.iterator(true);
while (it.hasNext()) {
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
}
BaseQuery.checkInterrupted();
jsonGenerator.writeObject(it.next());
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.monomorphicprocessing;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation for methods, which are called from hot loops during query processing. Those methods and their downstream
* callees should be optimized as much as possible.
*
* <p>Currently CalledFromHotLoop is a source-level annotation, used only for documentation.
*
* @see HotLoopCallee
*/
@Documented
@Inherited
@Retention(RetentionPolicy.SOURCE)
@Target(ElementType.METHOD)
public @interface CalledFromHotLoop
{
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.monomorphicprocessing;
/**
* Marker interface for abstractions, which are called from hot loops during query processing. Some of the methods of
* interfaces extending HotLoopCallee should be annotated with {@link CalledFromHotLoop}.
*/
public interface HotLoopCallee
{
/**
* Implementations of this method should call {@code inspector.visit()} with all fields of this class, which meet two
* conditions:
* 1. They are used in methods of this class, annotated with {@link CalledFromHotLoop}
* 2. They are either:
* a. Nullable objects
* b. Instances of HotLoopCallee
* c. Objects, which don't always have a specific class in runtime. For example, a field of type {@link
* java.util.Set} could be {@link java.util.HashSet} or {@link java.util.TreeSet} in runtime, depending on how
* this instance (the instance on which inspectRuntimeShape() is called) is configured.
* d. ByteBuffer or similar objects, where byte order matters
* e. boolean flags, affecting branch taking
* f. Arrays of objects, meeting conditions any of conditions a-e.
*/
void inspectRuntimeShape(RuntimeShapeInspector inspector);
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.monomorphicprocessing;
import javax.annotation.Nullable;
/**
* @see HotLoopCallee#inspectRuntimeShape(RuntimeShapeInspector)
*/
public interface RuntimeShapeInspector
{
void visit(String fieldName, @Nullable HotLoopCallee value);
void visit(String fieldName, @Nullable Object value);
<T> void visit(String fieldName, T[] values);
void visit(String flagName, boolean flagValue);
/**
* To be called from {@link HotLoopCallee#inspectRuntimeShape(RuntimeShapeInspector)} with something, that is
* important to ensure monomorphism and predictable branch taking in hot loops, but doesn't apply to other visit()
* methods in RuntimeShapeInspector. For example, {@link io.druid.segment.BitmapOffset#inspectRuntimeShape} reports
* bitmap population via this method, to ensure predictable branch taking inside Bitmap's iterators.
*/
void visit(String key, String runtimeShape);
}

View File

@ -0,0 +1,401 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.monomorphicprocessing;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import io.druid.concurrent.Execs;
import io.druid.java.util.common.logger.Logger;
import org.objectweb.asm.ClassReader;
import org.objectweb.asm.ClassVisitor;
import org.objectweb.asm.ClassWriter;
import org.objectweb.asm.commons.ClassRemapper;
import org.objectweb.asm.commons.SimpleRemapper;
import sun.misc.Unsafe;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Manages class specialization during query processing.
* Usage:
*
* String runtimeShape = stringRuntimeShape.of(bufferAggregator);
* SpecializationState<ProcessingAlgorithm> specializationState = SpecializationService.getSpecializationState(
* ProcessingAlgorithmImpl.class,
* runtimeShape
* );
* ProcessingAlgorithm algorithm = specializationState.getSpecializedOrDefault(new ProcessingAlgorithmImpl());
* long loopIterations = new ProcessingAlgorithmImpl().run(bufferAggregator, ...);
* specializationState.accountLoopIterations(loopIterations);
*
* ProcessingAlgorithmImpl.class, passed as prototypeClass to {@link #getSpecializationState} methods must have public
* no-arg constructor and must be stateless (no fields).
*
* @see SpecializationState
*/
public final class SpecializationService
{
private static final Logger LOG = new Logger(SpecializationService.class);
private static final Unsafe UNSAFE;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
UNSAFE = (Unsafe) theUnsafe.get(null);
}
catch (Exception e) {
throw new RuntimeException("Cannot access Unsafe methods", e);
}
}
/**
* If true, specialization is not actually done, an instance of prototypeClass is used as a "specialized" instance.
* Useful for analysis of generated assembly with JITWatch (https://github.com/AdoptOpenJDK/jitwatch), because
* JITWatch shows only classes present in the loaded JAR (prototypeClass should be), not classes generated during
* runtime.
*/
private static final boolean fakeSpecialize = Boolean.getBoolean("fakeSpecialize");
/**
* Number of loop iterations, accounted via {@link SpecializationState#accountLoopIterations(long)} in
* {@link WindowedLoopIterationCounter} during the last hour window, after which WindowedLoopIterationCounter decides
* to specialize class for the specific runtimeShape. The default value is chosen to be so that the specialized
* class will likely be compiled with C2 HotSpot compiler with the default values of *BackEdgeThreshold options.
*/
private static final int triggerSpecializationIterationsThreshold =
Integer.getInteger("triggerSpecializationIterationsThreshold", 10_000);
/**
* The maximum number of specializations, that this service is allowed to make. It's not unlimited because each
* specialization takes some JVM memory (machine code cache, byte code, etc.)
*/
private static final int maxSpecializations = Integer.getInteger("maxSpecializations", 1000);
private static final ExecutorService classSpecializationExecutor = Execs.singleThreaded("class-specialization-%d");
private static final AtomicLong specializedClassCounter = new AtomicLong();
private static final ClassValue<PerPrototypeClassState> perPrototypeClassState =
new ClassValue<PerPrototypeClassState>()
{
@Override
protected PerPrototypeClassState computeValue(Class<?> type)
{
return new PerPrototypeClassState<>(type);
}
};
/**
* @param <T> type of query processing algorithm
* @see SpecializationService class-level javadoc for details
*/
public static <T> SpecializationState<T> getSpecializationState(
Class<? extends T> prototypeClass,
String runtimeShape
)
{
return getSpecializationState(prototypeClass, runtimeShape, ImmutableMap.<Class<?>, Class<?>>of());
}
/**
* @param classRemapping classes, that should be replaced in the bytecode of the given prototypeClass when specialized
* @see #getSpecializationState(Class, String)
*/
@SuppressWarnings("unchecked")
public static <T> SpecializationState<T> getSpecializationState(
Class<? extends T> prototypeClass,
String runtimeShape,
ImmutableMap<Class<?>, Class<?>> classRemapping
)
{
return perPrototypeClassState.get(prototypeClass).getSpecializationState(runtimeShape, classRemapping);
}
static class PerPrototypeClassState<T>
{
private final Class<T> prototypeClass;
private final ConcurrentMap<SpecializationId, SpecializationState<T>> specializationStates =
new ConcurrentHashMap<>();
private final String prototypeClassBytecodeName;
private final String specializedClassNamePrefix;
private byte[] prototypeClassBytecode;
PerPrototypeClassState(Class<T> prototypeClass)
{
this.prototypeClass = prototypeClass;
String prototypeClassName = prototypeClass.getName();
prototypeClassBytecodeName = classBytecodeName(prototypeClassName);
specializedClassNamePrefix = prototypeClassName + "$Copy";
}
SpecializationState<T> getSpecializationState(String runtimeShape, ImmutableMap<Class<?>, Class<?>> classRemapping)
{
SpecializationId specializationId = new SpecializationId(runtimeShape, classRemapping);
SpecializationState<T> alreadyExistingState = specializationStates.get(specializationId);
if (alreadyExistingState != null) {
return alreadyExistingState;
}
return specializationStates.computeIfAbsent(specializationId, id -> new WindowedLoopIterationCounter<>(this, id));
}
T specialize(ImmutableMap<Class<?>, Class<?>> classRemapping)
{
String specializedClassName = specializedClassNamePrefix + specializedClassCounter.get();
ClassWriter specializedClassWriter = new ClassWriter(0);
SimpleRemapper remapper = new SimpleRemapper(createRemapping(classRemapping, specializedClassName));
ClassVisitor classTransformer = new ClassRemapper(specializedClassWriter, remapper);
try {
ClassReader prototypeClassReader = new ClassReader(getPrototypeClassBytecode());
prototypeClassReader.accept(classTransformer, 0);
byte[] specializedClassBytecode = specializedClassWriter.toByteArray();
Class<T> specializedClass = defineClass(specializedClassName, specializedClassBytecode);
specializedClassCounter.incrementAndGet();
return specializedClass.newInstance();
}
catch (InstantiationException | IllegalAccessException | IOException e) {
throw new RuntimeException(e);
}
}
private HashMap<String, String> createRemapping(
ImmutableMap<Class<?>, Class<?>> classRemapping,
String specializedClassName
)
{
HashMap<String, String> remapping = new HashMap<>();
remapping.put(prototypeClassBytecodeName, classBytecodeName(specializedClassName));
for (Map.Entry<Class<?>, Class<?>> classRemappingEntry : classRemapping.entrySet()) {
Class<?> sourceClass = classRemappingEntry.getKey();
Class<?> remappingClass = classRemappingEntry.getValue();
remapping.put(classBytecodeName(sourceClass.getName()), classBytecodeName(remappingClass.getName()));
}
return remapping;
}
@SuppressWarnings("unchecked")
private Class<T> defineClass(String specializedClassName, byte[] specializedClassBytecode)
{
return (Class<T>) UNSAFE.defineClass(
specializedClassName,
specializedClassBytecode,
0,
specializedClassBytecode.length,
prototypeClass.getClassLoader(),
prototypeClass.getProtectionDomain()
);
}
/**
* No synchronization, because {@link #specialize} is called only from {@link #classSpecializationExecutor}, i. e.
* from a single thread.
*/
byte[] getPrototypeClassBytecode() throws IOException
{
if (prototypeClassBytecode == null) {
ClassLoader cl = prototypeClass.getClassLoader();
try (InputStream prototypeClassBytecodeStream =
cl.getResourceAsStream(prototypeClassBytecodeName + ".class")) {
prototypeClassBytecode = ByteStreams.toByteArray(prototypeClassBytecodeStream);
}
}
return prototypeClassBytecode;
}
private static String classBytecodeName(String className)
{
return className.replace('.', '/');
}
}
private static class SpecializationId
{
private final String runtimeShape;
private final ImmutableMap<Class<?>, Class<?>> classRemapping;
private final int hashCode;
private SpecializationId(String runtimeShape, ImmutableMap<Class<?>, Class<?>> classRemapping)
{
this.runtimeShape = runtimeShape;
this.classRemapping = classRemapping;
this.hashCode = runtimeShape.hashCode() * 1000003 + classRemapping.hashCode();
}
@Override
public boolean equals(Object obj)
{
if (!(obj instanceof SpecializationId)) {
return false;
}
SpecializationId other = (SpecializationId) obj;
return runtimeShape.equals(other.runtimeShape) && classRemapping.equals(other.classRemapping);
}
@Override
public int hashCode()
{
return hashCode;
}
}
/**
* Accumulates the number of iterations during the last hour. (Window size = 1 hour)
*/
static class WindowedLoopIterationCounter<T> extends SpecializationState<T> implements Runnable
{
private final PerPrototypeClassState<T> perPrototypeClassState;
private final SpecializationId specializationId;
/** A map with the number of iterations per each minute during the last hour */
private final ConcurrentMap<Long, AtomicLong> perMinuteIterations = new ConcurrentHashMap<>();
private final AtomicBoolean specializationScheduled = new AtomicBoolean(false);
WindowedLoopIterationCounter(
PerPrototypeClassState<T> perPrototypeClassState,
SpecializationId specializationId
)
{
this.perPrototypeClassState = perPrototypeClassState;
this.specializationId = specializationId;
}
@Nullable
@Override
public T getSpecialized()
{
return null;
}
@Override
public void accountLoopIterations(long loopIterations)
{
if (specializationScheduled.get()) {
return;
}
if (loopIterations > triggerSpecializationIterationsThreshold ||
addAndGetTotalIterationsOverTheLastHour(loopIterations) > triggerSpecializationIterationsThreshold) {
if (specializationScheduled.compareAndSet(false, true)) {
classSpecializationExecutor.submit(this);
}
}
}
private long addAndGetTotalIterationsOverTheLastHour(long newIterations)
{
long currentMillis = System.currentTimeMillis();
long currentMinute = TimeUnit.MILLISECONDS.toMinutes(currentMillis);
long minuteOneHourAgo = currentMinute - TimeUnit.HOURS.toMinutes(1);
long totalIterations = 0;
boolean currentMinutePresent = false;
for (Iterator<Map.Entry<Long, AtomicLong>> it = perMinuteIterations.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Long, AtomicLong> minuteStats = it.next();
long minute = minuteStats.getKey();
if (minute < minuteOneHourAgo) {
it.remove();
} else if (minute == currentMinute) {
totalIterations += minuteStats.getValue().addAndGet(newIterations);
currentMinutePresent = true;
} else {
totalIterations += minuteStats.getValue().get();
}
}
if (!currentMinutePresent) {
perMinuteIterations.computeIfAbsent(currentMinute, AtomicLong::new).addAndGet(newIterations);
totalIterations += newIterations;
}
return totalIterations;
}
@Override
public void run()
{
try {
T specialized;
if (specializedClassCounter.get() > maxSpecializations) {
// Don't specialize, just instantiate the prototype class and emit a warning.
// The "better" approach is probably to implement some kind of cache eviction from
// PerPrototypeClassState.specializationStates. But it might be that nobody ever hits even the current
// maxSpecializations limit, so implementing cache eviction is an unnecessary complexity.
specialized = perPrototypeClassState.prototypeClass.newInstance();
LOG.warn(
"SpecializationService couldn't make more than [%d] specializations. "
+ "Not doing specialization for runtime shape[%s] and class remapping[%s], using the prototype class[%s]",
maxSpecializations,
specializationId.runtimeShape,
specializationId.classRemapping,
perPrototypeClassState.prototypeClass
);
} else if (fakeSpecialize) {
specialized = perPrototypeClassState.prototypeClass.newInstance();
} else {
specialized = perPrototypeClassState.specialize(specializationId.classRemapping);
}
perPrototypeClassState.specializationStates.put(specializationId, new Specialized<>(specialized));
}
catch (Exception e) {
LOG.error(
e,
"Error specializing prototype class[%s] for runtime shape[%s] and class remapping[%s]",
perPrototypeClassState.prototypeClass,
specializationId.runtimeShape,
specializationId.classRemapping
);
}
}
}
static class Specialized<T> extends SpecializationState<T>
{
private final T specialized;
Specialized(T specialized)
{
this.specialized = specialized;
}
@Override
public T getSpecialized()
{
return specialized;
}
@Override
public void accountLoopIterations(long loopIterations)
{
// do nothing
}
}
private SpecializationService()
{
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.monomorphicprocessing;
import javax.annotation.Nullable;
/**
* @param <T> type of query processing algorithm
* @see SpecializationService
*/
public abstract class SpecializationState<T>
{
/**
* Returns an instance of specialized version of query processing algorithm, if available, null otherwise.
*/
@Nullable
public abstract T getSpecialized();
/**
* Returns an instance of specialized version of query processing algorithm, if available, defaultInstance otherwise.
*/
public final T getSpecializedOrDefault(T defaultInstance)
{
T specialized = getSpecialized();
return specialized != null ? specialized : defaultInstance;
}
/**
* Accounts the number of loop iterations, made when processing queries without specialized algorithm, i. e. after
* {@link #getSpecialized()} returned null. If sufficiently many loop iterations were made, {@link
* SpecializationService} decides that the algorithm is worth to be specialized, and {@link #getSpecialized()} will
* return non-null during later queries.
*/
public abstract void accountLoopIterations(long loopIterations);
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.monomorphicprocessing;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
/**
* Class to be used to obtain String representation of runtime shape of one or several {@link HotLoopCallee}s.
* Example:
*
* String offsetShape = StringRuntimeShape.of(offset);
* String dimensionSelectorShape = StringRuntimeShape.of(dimensionSelector);
*/
public final class StringRuntimeShape
{
public static String of(HotLoopCallee hotLoopCallee)
{
return new Inspector().runtimeShapeOf(hotLoopCallee);
}
public static String of(HotLoopCallee... hotLoopCallees)
{
return new Inspector().runtimeShapeOf(hotLoopCallees);
}
private StringRuntimeShape()
{
}
private static class Inspector implements RuntimeShapeInspector
{
private final StringBuilder sb = new StringBuilder();
private int indent = 0;
String runtimeShapeOf(HotLoopCallee hotLoopCallee)
{
visit(hotLoopCallee);
return sb.toString();
}
String runtimeShapeOf(HotLoopCallee[] hotLoopCallees)
{
for (HotLoopCallee hotLoopCallee : hotLoopCallees) {
visit(hotLoopCallee);
sb.append(",\n");
}
sb.setLength(sb.length() - 2);
return sb.toString();
}
private void indent()
{
for (int i = 0; i < indent; i++) {
sb.append(' ');
}
}
private void incrementIndent()
{
indent += 2;
}
private void decrementIndent()
{
indent -= 2;
}
private void visit(@Nullable Object value)
{
if (value == null) {
sb.append("null");
return;
}
sb.append(value.getClass().getName());
if (value instanceof HotLoopCallee) {
appendHotLoopCalleeShape((HotLoopCallee) value);
} else if (value instanceof ByteBuffer) {
// ByteBuffers are treated specially because the byte order is an important part of the runtime shape.
appendByteBufferShape((ByteBuffer) value);
}
}
private void appendHotLoopCalleeShape(HotLoopCallee value)
{
sb.append(" {\n");
int lengthBeforeInspection = sb.length();
incrementIndent();
value.inspectRuntimeShape(this);
decrementIndent();
if (sb.length() == lengthBeforeInspection) {
// remove " {\n"
sb.setLength(lengthBeforeInspection - 3);
} else {
removeLastComma();
indent();
sb.append('}');
}
}
private void appendByteBufferShape(ByteBuffer byteBuffer)
{
sb.append(" {order: ");
sb.append(byteBuffer.order().toString());
sb.append('}');
}
private void removeLastComma()
{
assert sb.charAt(sb.length() - 2) == ',' && sb.charAt(sb.length() - 1) == '\n';
sb.setCharAt(sb.length() - 2, '\n');
sb.setLength(sb.length() - 1);
}
@Override
public void visit(String fieldName, @Nullable HotLoopCallee value)
{
visit(fieldName, (Object) value);
}
@Override
public void visit(String fieldName, @Nullable Object value)
{
indent();
sb.append(fieldName);
sb.append(": ");
visit(value);
sb.append(",\n");
}
@Override
public <T> void visit(String fieldName, T[] values)
{
indent();
sb.append(fieldName);
sb.append(": [\n");
int lengthBeforeInspection = sb.length();
incrementIndent();
for (T value : values) {
indent();
visit(value);
sb.append(",\n");
}
decrementIndent();
if (sb.length() == lengthBeforeInspection) {
sb.setCharAt(lengthBeforeInspection - 1, ']');
} else {
removeLastComma();
indent();
sb.append(']');
}
sb.append(",\n");
}
@Override
public void visit(String flagName, boolean flagValue)
{
indent();
sb.append(flagName);
sb.append(": ");
sb.append(flagValue);
sb.append(",\n");
}
@Override
public void visit(String key, String runtimeShape)
{
indent();
sb.append(key);
sb.append(": ");
sb.append(runtimeShape);
sb.append(",\n");
}
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
public interface Generic1AggPooledTopNScanner
{
/**
* @param aggregatorSize number of bytes required by aggregator for a single aggregation
* @param positions a cache for positions in resultsBuffer, where specific (indexed) dimension values are aggregated
* @return number of scanned rows, i. e. number of steps made with the given cursor
*/
long scanAndAggregate(
DimensionSelector dimensionSelector,
BufferAggregator aggregator,
int aggregatorSize,
Cursor cursor,
int[] positions,
ByteBuffer resultsBuffer
);
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import java.nio.ByteBuffer;
public final class Generic1AggPooledTopNScannerPrototype implements Generic1AggPooledTopNScanner
{
@Override
public long scanAndAggregate(
DimensionSelector dimensionSelector,
BufferAggregator aggregator,
int aggregatorSize,
Cursor cursor,
int[] positions,
ByteBuffer resultsBuffer
)
{
long scannedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator.aggregate(resultsBuffer, position);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator.init(resultsBuffer, position);
aggregator.aggregate(resultsBuffer, position);
positionToAllocate += aggregatorSize;
}
}
scannedRows++;
cursor.advanceUninterruptibly();
}
return scannedRows;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
public interface Generic2AggPooledTopNScanner
{
/**
* @param aggregator1Size number of bytes required by aggregator1 for a single aggregation
* @param aggregator2Size number of bytes required by aggregator2 for a single aggregation
* @param positions a cache for positions in resultsBuffer, where specific (indexed) dimension values are aggregated
* @return number of scanned rows, i. e. number of steps made with the given cursor
*/
long scanAndAggregate(
DimensionSelector dimensionSelector,
BufferAggregator aggregator1,
int aggregator1Size,
BufferAggregator aggregator2,
int aggregator2Size,
Cursor cursor,
int[] positions,
ByteBuffer resultsBuffer
);
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import java.nio.ByteBuffer;
public final class Generic2AggPooledTopNScannerPrototype implements Generic2AggPooledTopNScanner
{
@Override
public long scanAndAggregate(
DimensionSelector dimensionSelector,
BufferAggregator aggregator1,
int aggregator1Size,
BufferAggregator aggregator2,
int aggregator2Size,
Cursor cursor,
int[] positions,
ByteBuffer resultsBuffer
)
{
int totalAggregatorsSize = aggregator1Size + aggregator2Size;
long scannedRows = 0;
int positionToAllocate = 0;
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimensionSelector.getRow();
final int dimSize = dimValues.size();
for (int i = 0; i < dimSize; i++) {
int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
if (position >= 0) {
aggregator1.aggregate(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position + aggregator1Size);
} else if (position == TopNAlgorithm.INIT_POSITION_VALUE) {
positions[dimIndex] = positionToAllocate;
position = positionToAllocate;
aggregator1.init(resultsBuffer, position);
aggregator1.aggregate(resultsBuffer, position);
position += aggregator1Size;
aggregator2.init(resultsBuffer, position);
aggregator2.aggregate(resultsBuffer, position);
positionToAllocate += totalAggregatorsSize;
}
}
scannedRows++;
cursor.advanceUninterruptibly();
}
return scannedRows;
}
}

View File

@ -19,13 +19,18 @@
package io.druid.query.topn;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.BaseQuery;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.monomorphicprocessing.SpecializationService;
import io.druid.query.monomorphicprocessing.SpecializationState;
import io.druid.query.monomorphicprocessing.StringRuntimeShape;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
@ -40,6 +45,19 @@ import java.util.Arrays;
public class PooledTopNAlgorithm
extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
{
/** Non-final fields for testing, see TopNQueryRunnerTest */
@VisibleForTesting
static boolean specializeGeneric1AggPooledTopN =
!Boolean.getBoolean("dontSpecializeGeneric1AggPooledTopN");
@VisibleForTesting
static boolean specializeGeneric2AggPooledTopN =
!Boolean.getBoolean("dontSpecializeGeneric2AggPooledTopN");
private static final Generic1AggPooledTopNScanner defaultGeneric1AggScanner =
new Generic1AggPooledTopNScannerPrototype();
private static final Generic2AggPooledTopNScanner defaultGeneric2AggScanner =
new Generic2AggPooledTopNScannerPrototype();
private final Capabilities capabilities;
private final TopNQuery query;
private final StupidPool<ByteBuffer> bufferPool;
@ -166,6 +184,75 @@ public class PooledTopNAlgorithm
{
return makeBufferAggregators(params.getCursor(), query.getAggregatorSpecs());
}
@Override
protected void scanAndAggregate(
final PooledTopNParams params,
final int[] positions,
final BufferAggregator[] theAggregators,
final int numProcessed
)
{
final Cursor cursor = params.getCursor();
if (specializeGeneric1AggPooledTopN && theAggregators.length == 1) {
scanAndAggregateGeneric1Agg(params, positions, theAggregators[0], cursor);
} else if (specializeGeneric2AggPooledTopN && theAggregators.length == 2) {
scanAndAggregateGeneric2Agg(params, positions, theAggregators, cursor);
} else {
scanAndAggregateDefault(params, positions, theAggregators);
}
BaseQuery.checkInterrupted();
}
private static void scanAndAggregateGeneric1Agg(
PooledTopNParams params,
int[] positions,
BufferAggregator aggregator,
Cursor cursor
)
{
String runtimeShape = StringRuntimeShape.of(aggregator);
Class<? extends Generic1AggPooledTopNScanner> prototypeClass = Generic1AggPooledTopNScannerPrototype.class;
SpecializationState<Generic1AggPooledTopNScanner> specializationState = SpecializationService
.getSpecializationState(prototypeClass, runtimeShape);
Generic1AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(defaultGeneric1AggScanner);
long scannedRows = scanner.scanAndAggregate(
params.getDimSelector(),
aggregator,
params.getAggregatorSizes()[0],
cursor,
positions,
params.getResultsBuf()
);
specializationState.accountLoopIterations(scannedRows);
}
private static void scanAndAggregateGeneric2Agg(
PooledTopNParams params,
int[] positions,
BufferAggregator[] theAggregators,
Cursor cursor
)
{
String runtimeShape = StringRuntimeShape.of(theAggregators);
Class<? extends Generic2AggPooledTopNScanner> prototypeClass = Generic2AggPooledTopNScannerPrototype.class;
SpecializationState<Generic2AggPooledTopNScanner> specializationState = SpecializationService
.getSpecializationState(prototypeClass, runtimeShape);
Generic2AggPooledTopNScanner scanner = specializationState.getSpecializedOrDefault(defaultGeneric2AggScanner);
int[] aggregatorSizes = params.getAggregatorSizes();
long scannedRows = scanner.scanAndAggregate(
params.getDimSelector(),
theAggregators[0],
aggregatorSizes[0],
theAggregators[1],
aggregatorSizes[1],
cursor,
positions,
params.getResultsBuf()
);
specializationState.accountLoopIterations(scannedRows);
}
/**
* Use aggressive loop unrolling to aggregate the data
*
@ -184,12 +271,10 @@ public class PooledTopNAlgorithm
* still optimizes the high quantity of aggregate queries which benefit greatly from any speed improvements
* (they simply take longer to start with).
*/
@Override
protected void scanAndAggregate(
private static void scanAndAggregateDefault(
final PooledTopNParams params,
final int[] positions,
final BufferAggregator[] theAggregators,
final int numProcessed
final BufferAggregator[] theAggregators
)
{
if (params.getCardinality() < 0) {
@ -211,8 +296,7 @@ public class PooledTopNAlgorithm
final int aggSize = theAggregators.length;
final int aggExtra = aggSize % AGG_UNROLL_COUNT;
int currentPosition = 0;
while (!cursor.isDone()) {
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimSelector.getRow();
final int dimSize = dimValues.size();
@ -393,7 +477,7 @@ public class PooledTopNAlgorithm
currentPosition
);
}
cursor.advance();
cursor.advanceUninterruptibly();
}
}

View File

@ -310,9 +310,10 @@ public class TopNQueryBuilder
return this;
}
public TopNQueryBuilder aggregators(List<AggregatorFactory> a)
@SuppressWarnings("unchecked")
public TopNQueryBuilder aggregators(List<? extends AggregatorFactory> a)
{
aggregatorSpecs = a;
aggregatorSpecs = (List<AggregatorFactory>) a;
return this;
}

View File

@ -23,10 +23,17 @@ import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.bitmap.WrappedImmutableRoaringBitmap;
import io.druid.collections.bitmap.WrappedRoaringBitmap;
import io.druid.extendedset.intset.EmptyIntIterator;
import io.druid.java.util.common.RE;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Offset;
import io.druid.segment.data.RoaringBitmapSerdeFactory;
import org.roaringbitmap.IntIterator;
import java.util.Arrays;
import java.util.HashSet;
/**
*/
public class BitmapOffset implements Offset
@ -34,12 +41,93 @@ public class BitmapOffset implements Offset
private static final int INVALID_VALUE = -1;
private static final BitmapFactory ROARING_BITMAP_FACTORY = new RoaringBitmapSerdeFactory(false).getBitmapFactory();
private final IntIterator itr;
private final BitmapFactory bitmapFactory;
private final ImmutableBitmap bitmapIndex;
private final boolean descending;
/**
* Currently the default stops are not consciously optimized for the goals described in {@link #factorizeFullness}.
* They are chosen intuitively. There was no experimentation with different bitmapFullnessFactorizationStops.
* Experimentation and performance feedback with a different set of stops is welcome.
*/
private static final String DEFAULT_FULLNESS_FACTORIZATION_STOPS = "0.01,0.1,0.3,0.5,0.7,0.9,0.99";
private static final double[] BITMAP_FULLNESS_FACTORIZATION_STOPS;
private static final String[] FACTORIZED_FULLNESS;
static
{
String stopString = System.getProperty("bitmapFullnessFactorizationStops", DEFAULT_FULLNESS_FACTORIZATION_STOPS);
String[] stopsArray = stopString.split(",");
if (stopsArray.length == 0) {
throw new RE("Empty bitmapFullnessFactorizationStops: " + stopString);
}
if (new HashSet<>(Arrays.asList(stopsArray)).size() != stopsArray.length) {
throw new RE("Non unique bitmapFullnessFactorizationStops: " + stopString);
}
private volatile int val;
BITMAP_FULLNESS_FACTORIZATION_STOPS = new double[stopsArray.length];
for (int i = 0; i < stopsArray.length; i++) {
String stop = stopsArray[i];
BITMAP_FULLNESS_FACTORIZATION_STOPS[i] = Double.parseDouble(stop);
}
Arrays.sort(BITMAP_FULLNESS_FACTORIZATION_STOPS);
double firstStop = BITMAP_FULLNESS_FACTORIZATION_STOPS[0];
if (Double.isNaN(firstStop) || firstStop <= 0.0) {
throw new RE("First bitmapFullnessFactorizationStop[%d] should be > 0", firstStop);
}
double lastStop = BITMAP_FULLNESS_FACTORIZATION_STOPS[stopsArray.length - 1];
if (Double.isNaN(lastStop) || lastStop >= 1) {
throw new RE("Last bitmapFullnessFactorizationStop[%d] should be < 1", lastStop);
}
String prevStop = "0";
FACTORIZED_FULLNESS = new String[stopsArray.length + 1];
for (int i = 0; i < stopsArray.length; i++) {
String stop = String.valueOf(BITMAP_FULLNESS_FACTORIZATION_STOPS[i]);
FACTORIZED_FULLNESS[i] = "(" + prevStop + ", " + stop + "]";
prevStop = stop;
}
FACTORIZED_FULLNESS[stopsArray.length] = "(" + prevStop + ", 1)";
}
/**
* Processing of queries with BitmapOffsets, whose Bitmaps has different factorized fullness (bucket), reported from
* this method, uses different copies of the same code, so JIT compiler analyzes and compiles the code for different
* factorized fullness separately. The goal is to capture frequency of abstraction usage in compressed bitmap
* algorithms, i. e.
* - "Zero sequence" vs. "Literal" vs. "One sequence" in {@link io.druid.extendedset.intset.ImmutableConciseSet}
* - {@link org.roaringbitmap.ArrayContainer} vs {@link org.roaringbitmap.BitmapContainer} in Roaring
* and then https://shipilev.net/blog/2015/black-magic-method-dispatch/ comes into play. The secondary goal is to
* capture HotSpot's thresholds, which it uses to compile conditional blocks differently inside bitmap impls. See
* https://bugs.openjdk.java.net/browse/JDK-6743900. The default BlockLayoutMinDiamondPercentage=20, i. e. if
* probability of taking some branch is less than 20%, it is moved out of the hot path (to save some icache?).
*
* On the other hand, we don't want to factor fullness into too small pieces, because
* - too little queries may fall into those small buckets, and they are not compiled with Hotspot's C2 compiler
* - if there are a lot of queries for each small factorized fullness and their copies of the code is compiled by
* C2, this pollutes code cache and takes time to perform too many compilations, while some of them likely produce
* identical code.
*
* Ideally there should be as much buckets as possible as long as Hotspot's C2 output for each bucket is different.
*/
private static String factorizeFullness(long bitmapCardinality, long numRows)
{
if (bitmapCardinality == 0) {
return "0";
} else if (bitmapCardinality == numRows) {
return "1";
} else {
double fullness = bitmapCardinality / (double) numRows;
int index = Arrays.binarySearch(BITMAP_FULLNESS_FACTORIZATION_STOPS, fullness);
if (index < 0) {
index = ~index;
}
return FACTORIZED_FULLNESS[index];
}
}
final IntIterator itr;
final String fullness;
int val;
public static IntIterator getReverseBitmapOffsetIterator(ImmutableBitmap bitmapIndex)
{
@ -55,16 +143,25 @@ public class BitmapOffset implements Offset
return ((WrappedImmutableRoaringBitmap) roaringBitmap).getBitmap().getReverseIntIterator();
}
public BitmapOffset(BitmapFactory bitmapFactory, ImmutableBitmap bitmapIndex, boolean descending)
public static BitmapOffset of(ImmutableBitmap bitmapIndex, boolean descending, long numRows)
{
this.bitmapFactory = bitmapFactory;
this.bitmapIndex = bitmapIndex;
this.descending = descending;
this.itr = newIterator();
if (bitmapIndex instanceof WrappedImmutableRoaringBitmap ||
bitmapIndex instanceof WrappedRoaringBitmap ||
descending) {
return new RoaringBitmapOffset(bitmapIndex, descending, numRows);
} else {
return new BitmapOffset(bitmapIndex, descending, numRows);
}
}
private BitmapOffset(ImmutableBitmap bitmapIndex, boolean descending, long numRows)
{
this.itr = newIterator(bitmapIndex, descending);
this.fullness = factorizeFullness(bitmapIndex.size(), numRows);
increment();
}
private IntIterator newIterator()
private IntIterator newIterator(ImmutableBitmap bitmapIndex, boolean descending)
{
if (!descending) {
return bitmapIndex.iterator();
@ -73,13 +170,11 @@ public class BitmapOffset implements Offset
}
}
private BitmapOffset(BitmapOffset otherOffset)
private BitmapOffset(String fullness, IntIterator itr, int val)
{
this.bitmapFactory = otherOffset.bitmapFactory;
this.bitmapIndex = otherOffset.bitmapIndex;
this.descending = otherOffset.descending;
this.itr = otherOffset.itr.clone();
this.val = otherOffset.val;
this.fullness = fullness;
this.itr = itr;
this.val = val;
}
@Override
@ -101,11 +196,7 @@ public class BitmapOffset implements Offset
@Override
public Offset clone()
{
if (bitmapIndex == null || bitmapIndex.size() == 0) {
return new BitmapOffset(bitmapFactory, bitmapFactory.makeEmptyImmutableBitmap(), descending);
}
return new BitmapOffset(this);
return new BitmapOffset(fullness, itr.clone(), val);
}
@Override
@ -113,4 +204,31 @@ public class BitmapOffset implements Offset
{
return val;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("itr", itr);
inspector.visit("fullness", fullness);
}
public static class RoaringBitmapOffset extends BitmapOffset
{
public RoaringBitmapOffset(ImmutableBitmap bitmapIndex, boolean descending, long numRows)
{
super(bitmapIndex, descending, numRows);
}
RoaringBitmapOffset(String fullness, IntIterator itr, int val)
{
super(fullness, itr, val);
}
@Override
public Offset clone()
{
return new RoaringBitmapOffset(fullness, itr.hasNext() ? itr.clone() : EmptyIntIterator.instance(), val);
}
}
}

View File

@ -24,9 +24,11 @@ package io.druid.segment;import org.joda.time.DateTime;
public interface Cursor extends ColumnSelectorFactory
{
public DateTime getTime();
public void advance();
public void advanceTo(int offset);
public boolean isDone();
public void reset();
DateTime getTime();
void advance();
void advanceUninterruptibly();
void advanceTo(int offset);
boolean isDone();
boolean isDoneOrInterrupted();
void reset();
}

View File

@ -21,13 +21,15 @@ package io.druid.segment;
import com.google.common.base.Predicate;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
import io.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
/**
*/
public interface DimensionSelector extends ColumnValueSelector
public interface DimensionSelector extends ColumnValueSelector, HotLoopCallee
{
public static int CARDINALITY_UNKNOWN = -1;
@ -38,6 +40,7 @@ public interface DimensionSelector extends ColumnValueSelector
*
* @return all values for the row as an IntBuffer
*/
@CalledFromHotLoop
public IndexedInts getRow();
/**
@ -90,6 +93,7 @@ public interface DimensionSelector extends ColumnValueSelector
* @param id id to lookup the field name for
* @return the field name for the given id
*/
@CalledFromHotLoop
public String lookupName(int id);
/**

View File

@ -19,12 +19,16 @@
package io.druid.segment;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
/**
* An object that gets a metric value. Metric values are always floats and there is an assumption that the
* FloatColumnSelector has a handle onto some other stateful object (e.g. an Offset) which is changing between calls
* to get() (though, that doesn't have to be the case if you always want the same value...).
*/
public interface FloatColumnSelector extends ColumnValueSelector
public interface FloatColumnSelector extends ColumnValueSelector, HotLoopCallee
{
@CalledFromHotLoop
public float get();
}

View File

@ -22,6 +22,7 @@ package io.druid.segment;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
@ -115,6 +116,11 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
float floatVal = (Float) dims[dimIndex];
return (long) floatVal;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
return new IndexerLongColumnSelector();
@ -140,6 +146,11 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
return (Float) dims[dimIndex];
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
return new IndexerFloatColumnSelector();

View File

@ -20,6 +20,7 @@
package io.druid.segment;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.virtual.BaseSingleValueDimensionSelector;
public class FloatWrappingDimensionSelector extends BaseSingleValueDimensionSelector
@ -42,4 +43,11 @@ public class FloatWrappingDimensionSelector extends BaseSingleValueDimensionSele
return extractionFn.apply(selector.get());
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("extractionFn", extractionFn);
}
}

View File

@ -19,9 +19,13 @@
package io.druid.segment;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
/**
*/
public interface LongColumnSelector extends ColumnValueSelector
public interface LongColumnSelector extends ColumnValueSelector, HotLoopCallee
{
@CalledFromHotLoop
public long get();
}

View File

@ -22,6 +22,7 @@ package io.druid.segment;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.incremental.IncrementalIndex;
@ -114,6 +115,11 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
return (Long) dims[dimIndex];
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
return new IndexerLongColumnSelector();
@ -140,6 +146,11 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
long longVal = (Long) dims[dimIndex];
return (float) longVal;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
return new IndexerFloatColumnSelector();

View File

@ -20,6 +20,7 @@
package io.druid.segment;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.virtual.BaseSingleValueDimensionSelector;
public class LongWrappingDimensionSelector extends BaseSingleValueDimensionSelector
@ -42,4 +43,11 @@ public class LongWrappingDimensionSelector extends BaseSingleValueDimensionSelec
return extractionFn.apply(selector.get());
}
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("extractionFn", extractionFn);
}
}

View File

@ -22,6 +22,7 @@ package io.druid.segment;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ZeroIndexedInts;
import io.druid.segment.filter.BooleanValueMatcher;
@ -91,4 +92,9 @@ public class NullDimensionSelector implements DimensionSelector, IdLookup
{
return Strings.isNullOrEmpty(name) ? 0 : -1;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -19,8 +19,12 @@
package io.druid.segment;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
public interface ObjectColumnSelector<T> extends ColumnValueSelector
{
public Class<T> classOfObject();
@CalledFromHotLoop
public T get();
}

View File

@ -31,13 +31,14 @@ import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.QueryInterruptedException;
import io.druid.query.BaseQuery;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.BooleanFilter;
import io.druid.query.filter.Filter;
import io.druid.query.filter.RowOffsetMatcherFactory;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
@ -274,10 +275,10 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
offset = new NoFilterOffset(0, index.getNumRows(), descending);
} else {
// Use AndFilter.getBitmapIndex to intersect the preFilters to get its short-circuiting behavior.
offset = new BitmapOffset(
selector.getBitmapFactory(),
offset = BitmapOffset.of(
AndFilter.getBitmapIndex(selector, preFilters),
descending
descending,
(long) getNumRows()
);
}
}
@ -472,10 +473,20 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
final DictionaryEncodedColumn<String> column = cachedColumn;
abstract class QueryableDimensionSelector implements DimensionSelector, IdLookup
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", column);
inspector.visit("cursorOffset", cursorOffset);
inspector.visit("extractionFn", extractionFn);
}
}
if (column == null) {
return NullDimensionSelector.instance();
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
class MultiValueDimensionSelector implements DimensionSelector, IdLookup
class MultiValueDimensionSelector extends QueryableDimensionSelector
{
@Override
public IndexedInts getRow()
@ -536,7 +547,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
return new MultiValueDimensionSelector();
} else {
class SingleValueDimensionSelector implements DimensionSelector, IdLookup
class SingleValueDimensionSelector extends QueryableDimensionSelector
{
@Override
public IndexedInts getRow()
@ -690,6 +701,13 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
return metricVals.getFloatSingleValueRow(cursorOffset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("metricVals", metricVals);
inspector.visit("cursorOffset", cursorOffset);
}
};
}
@ -724,6 +742,13 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
return metricVals.getLongSingleValueRow(cursorOffset.getOffset());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("metricVals", metricVals);
inspector.visit("cursorOffset", cursorOffset);
}
};
}
@ -912,9 +937,13 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
}
BaseQuery.checkInterrupted();
cursorOffset.increment();
}
@Override
public void advanceUninterruptibly()
{
cursorOffset.increment();
}
@ -934,6 +963,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return !cursorOffset.withinBounds();
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
@ -981,15 +1016,28 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
@Override
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
}
BaseQuery.checkInterrupted();
cursorOffset.increment();
while (!isDone()) {
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
BaseQuery.checkInterrupted();
if (filterMatcher.matches()) {
return;
} else {
cursorOffset.increment();
}
}
}
@Override
public void advanceUninterruptibly()
{
if (Thread.currentThread().isInterrupted()) {
return;
}
cursorOffset.increment();
while (!isDoneOrInterrupted()) {
if (filterMatcher.matches()) {
return;
} else {
@ -1014,6 +1062,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return !cursorOffset.withinBounds();
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
@ -1166,6 +1220,14 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
throw new IllegalStateException("clone");
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseOffset", baseOffset);
inspector.visit("timestamps", timestamps);
inspector.visit("allWithinThreshold", allWithinThreshold);
}
}
private static class AscendingTimestampCheckingOffset extends TimestampCheckingOffset
@ -1275,6 +1337,12 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
{
return currentOffset + "/" + rowCount + (descending ? "(DSC)" : "");
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("descending", descending);
}
}
@Override

View File

@ -22,6 +22,7 @@ package io.druid.segment;
import com.google.common.base.Predicate;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.SingleIndexedInt;
@ -156,4 +157,12 @@ public class SingleScanTimeDimSelector implements DimensionSelector
{
return null;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
inspector.visit("extractionFn", extractionFn);
inspector.visit("descending", descending);
}
}

View File

@ -32,6 +32,7 @@ import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.ArrayBasedIndexedInts;
import io.druid.segment.data.Indexed;
@ -541,6 +542,12 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
return getEncodedValue(name, false);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("currEntry", currEntry);
}
}
return new IndexerDimensionSelector();
}

View File

@ -19,6 +19,8 @@
package io.druid.segment;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
public final class ZeroFloatColumnSelector implements FloatColumnSelector
{
private static final ZeroFloatColumnSelector INSTANCE = new ZeroFloatColumnSelector();
@ -38,4 +40,9 @@ public final class ZeroFloatColumnSelector implements FloatColumnSelector
{
return 0.0f;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -19,6 +19,8 @@
package io.druid.segment;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
public final class ZeroLongColumnSelector implements LongColumnSelector
{
private static final ZeroLongColumnSelector INSTANCE = new ZeroLongColumnSelector();
@ -38,4 +40,9 @@ public final class ZeroLongColumnSelector implements LongColumnSelector
{
return 0;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -19,6 +19,8 @@
package io.druid.segment.column;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedLongs;
@ -27,17 +29,23 @@ import java.io.Closeable;
/**
*/
public interface GenericColumn extends Closeable
public interface GenericColumn extends HotLoopCallee, Closeable
{
public int length();
public ValueType getType();
public boolean hasMultipleValues();
@CalledFromHotLoop
public String getStringSingleValueRow(int rowNum);
@CalledFromHotLoop
public Indexed<String> getStringMultiValueRow(int rowNum);
@CalledFromHotLoop
public float getFloatSingleValueRow(int rowNum);
@CalledFromHotLoop
public IndexedFloats getFloatMultiValueRow(int rowNum);
@CalledFromHotLoop
public long getLongSingleValueRow(int rowNum);
@CalledFromHotLoop
public IndexedLongs getLongMultiValueRow(int rowNum);
@Override

View File

@ -19,6 +19,7 @@
package io.druid.segment.column;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedLongs;
@ -94,4 +95,10 @@ public class IndexedFloatsGenericColumn implements GenericColumn
{
column.close();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", column);
}
}

View File

@ -19,6 +19,7 @@
package io.druid.segment.column;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedFloats;
import io.druid.segment.data.IndexedLongs;
@ -94,4 +95,10 @@ public class IndexedLongsGenericColumn implements GenericColumn
{
column.close();
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", column);
}
}

View File

@ -19,6 +19,8 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/**
*/
public class ArrayBasedOffset implements Offset
@ -67,4 +69,9 @@ public class ArrayBasedOffset implements Offset
retVal.currIndex = currIndex;
return retVal;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -19,6 +19,8 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/**
*/
public class IntersectingOffset implements Offset {
@ -91,4 +93,11 @@ public class IntersectingOffset implements Offset {
final Offset rhsClone = rhs.clone();
return new IntersectingOffset(lhsClone, rhsClone);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("lhs", lhs);
inspector.visit("rhs", rhs);
}
}

View File

@ -19,14 +19,18 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
/**
* The "mutable" version of a ReadableOffset. Introduces "increment()" and "withinBounds()" methods, which are
* very similar to "next()" and "hasNext()" on the Iterator interface except increment() does not return a value.
*/
public interface Offset extends ReadableOffset
{
@CalledFromHotLoop
void increment();
@CalledFromHotLoop
boolean withinBounds();
Offset clone();

View File

@ -19,6 +19,9 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.query.monomorphicprocessing.HotLoopCallee;
/**
* A ReadableOffset is an object that provides an integer offset, ostensibly as an index into an array.
*
@ -26,8 +29,9 @@ package io.druid.segment.data;
* given to classes (e.g. FloatColumnSelector objects) by something which keeps a reference to the base Offset object
* and increments it.
*/
public interface ReadableOffset
public interface ReadableOffset extends HotLoopCallee
{
@CalledFromHotLoop
int getOffset();
}

View File

@ -19,6 +19,8 @@
package io.druid.segment.data;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
/**
*/
public class UnioningOffset implements Offset
@ -135,4 +137,11 @@ public class UnioningOffset implements Offset
return new UnioningOffset(newOffsets, newOffsetValues, nextOffsetIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("lhs", offsets[0]);
inspector.visit("rhs", offsets[1]);
}
}

View File

@ -26,11 +26,12 @@ import com.google.common.collect.Lists;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.QueryInterruptedException;
import io.druid.query.BaseQuery;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionHandler;
@ -273,8 +274,31 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
while (baseIter.hasNext()) {
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
BaseQuery.checkInterrupted();
currEntry.set(baseIter.next());
if (filterMatcher.matches()) {
return;
}
}
if (!filterMatcher.matches()) {
done = true;
}
}
@Override
public void advanceUninterruptibly()
{
if (!baseIter.hasNext()) {
done = true;
return;
}
while (baseIter.hasNext()) {
if (Thread.currentThread().isInterrupted()) {
return;
}
currEntry.set(baseIter.next());
@ -305,6 +329,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return done;
}
@Override
public boolean isDoneOrInterrupted()
{
return isDone() || Thread.currentThread().isInterrupted();
}
@Override
public void reset()
{
@ -316,9 +346,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
Iterators.advance(baseIter, numAdvanced);
}
if (Thread.interrupted()) {
throw new QueryInterruptedException(new InterruptedException());
}
BaseQuery.checkInterrupted();
boolean foundMatched = false;
while (baseIter.hasNext()) {
@ -414,6 +442,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
{
return index.getMetricFloatValue(currEntry.getValue(), metricIndex);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", index);
}
};
}
@ -425,14 +459,20 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
return new LongColumnSelector()
class TimeLongColumnSelector implements LongColumnSelector
{
@Override
public long get()
{
return currEntry.getKey().getTimestamp();
}
};
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
return new TimeLongColumnSelector();
}
final Integer dimIndex = index.getDimensionIndex(columnName);
@ -462,6 +502,12 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
metricIndex
);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("index", index);
}
};
}

View File

@ -21,6 +21,7 @@ package io.druid.segment.virtual;
import com.google.common.base.Predicate;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.CalledFromHotLoop;
import io.druid.segment.DimensionSelector;
import io.druid.segment.IdLookup;
import io.druid.segment.data.IndexedInts;
@ -31,6 +32,7 @@ import java.util.Objects;
public abstract class BaseSingleValueDimensionSelector implements DimensionSelector
{
@CalledFromHotLoop
protected abstract String getValue();
@Override

View File

@ -21,6 +21,7 @@ package io.druid.segment.virtual;
import io.druid.math.expr.Expr;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
@ -56,6 +57,12 @@ public class ExpressionSelectors
final Number number = baseSelector.get();
return number != null ? number.longValue() : nullValue;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseSelector", baseSelector);
}
}
return new ExpressionLongColumnSelector();
}
@ -75,6 +82,12 @@ public class ExpressionSelectors
final Number number = baseSelector.get();
return number != null ? number.floatValue() : nullValue;
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseSelector", baseSelector);
}
}
return new ExpressionFloatColumnSelector();
}
@ -96,6 +109,12 @@ public class ExpressionSelectors
final Number number = baseSelector.get();
return number == null ? null : String.valueOf(number);
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseSelector", baseSelector);
}
}
return new DefaultExpressionDimensionSelector();
} else {
@ -106,6 +125,12 @@ public class ExpressionSelectors
{
return extractionFn.apply(baseSelector.get());
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("baseSelector", baseSelector);
}
}
return new ExtractionExpressionDimensionSelector();
}

View File

@ -36,6 +36,7 @@ import io.druid.query.filter.RegexDimFilter;
import io.druid.query.filter.SearchQueryDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.query.ordering.StringComparators;
import io.druid.query.search.search.ContainsSearchQuerySpec;
import io.druid.segment.ColumnSelectorFactory;
@ -167,6 +168,11 @@ public class FilteredAggregatorTest
}
};
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
);
} else {

View File

@ -21,7 +21,7 @@ package io.druid.query.aggregation;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.TestLongColumnSelector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -42,7 +42,7 @@ public class MetricManipulatorFnsTest
final ArrayList<Object[]> constructorArrays = new ArrayList<>();
final long longVal = 13789;
LongMinAggregator longMinAggregator = new LongMinAggregator(
new LongColumnSelector()
new TestLongColumnSelector()
{
@Override
public long get()
@ -81,7 +81,7 @@ public class MetricManipulatorFnsTest
LongSumAggregatorFactory longSumAggregatorFactory = new LongSumAggregatorFactory(NAME, FIELD);
LongSumAggregator longSumAggregator = new LongSumAggregator(
new LongColumnSelector()
new TestLongColumnSelector()
{
@Override
public long get()

View File

@ -19,11 +19,9 @@
package io.druid.query.aggregation;
import io.druid.segment.FloatColumnSelector;
/**
*/
public class TestFloatColumnSelector implements FloatColumnSelector
public class TestFloatColumnSelector extends io.druid.segment.TestFloatColumnSelector
{
private final float[] floats;

View File

@ -19,11 +19,9 @@
package io.druid.query.aggregation;
import io.druid.segment.LongColumnSelector;
/**
*/
public class TestLongColumnSelector implements LongColumnSelector
public class TestLongColumnSelector extends io.druid.segment.TestLongColumnSelector
{
private final long[] longs;

View File

@ -44,7 +44,7 @@ public class CardinalityAggregatorBenchmark extends SimpleBenchmark
CardinalityBufferAggregator agg;
List<DimensionSelector> selectorList;
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfoList;
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] dimInfos;
ByteBuffer buf;
int pos;
@ -93,12 +93,9 @@ public class CardinalityAggregatorBenchmark extends SimpleBenchmark
(DimensionSelector) dim1
);
dimInfoList = Lists.newArrayList(dimInfo1);
dimInfos = new ColumnSelectorPlus[] {dimInfo1};
agg = new CardinalityBufferAggregator(
dimInfoList,
byRow
);
agg = new CardinalityBufferAggregator(dimInfos, byRow);
CardinalityAggregatorFactory factory = new CardinalityAggregatorFactory(
"billy",

View File

@ -43,6 +43,7 @@ import io.druid.query.extraction.ExtractionFn;
import io.druid.query.extraction.JavaScriptExtractionFn;
import io.druid.query.extraction.RegexDimExtractionFn;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionSelectorUtils;
import io.druid.segment.IdLookup;
@ -205,6 +206,11 @@ public class CardinalityAggregatorTest
}
};
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
/*
@ -418,7 +424,7 @@ public class CardinalityAggregatorTest
public void testBufferAggregateRows() throws Exception
{
CardinalityBufferAggregator agg = new CardinalityBufferAggregator(
dimInfoList,
dimInfoList.toArray(new ColumnSelectorPlus[] {}),
true
);
@ -439,7 +445,7 @@ public class CardinalityAggregatorTest
public void testBufferAggregateValues() throws Exception
{
CardinalityBufferAggregator agg = new CardinalityBufferAggregator(
dimInfoList,
dimInfoList.toArray(new ColumnSelectorPlus[] {}),
false
);

View File

@ -21,6 +21,7 @@ package io.druid.query.dimension;
import com.google.common.base.Predicate;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionSelectorUtils;
import io.druid.segment.IdLookup;
@ -92,4 +93,9 @@ class TestDimensionSelector implements DimensionSelector
}
};
}
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}

View File

@ -26,6 +26,8 @@ import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TestFloatColumnSelector;
import io.druid.segment.TestLongColumnSelector;
import io.druid.segment.column.ColumnCapabilities;
public class TestColumnSelectorFactory implements ColumnSelectorFactory
@ -46,7 +48,7 @@ public class TestColumnSelectorFactory implements ColumnSelectorFactory
@Override
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
{
return new FloatColumnSelector()
return new TestFloatColumnSelector()
{
@Override
public float get()
@ -59,7 +61,7 @@ public class TestColumnSelectorFactory implements ColumnSelectorFactory
@Override
public LongColumnSelector makeLongColumnSelector(final String columnName)
{
return new LongColumnSelector()
return new TestLongColumnSelector()
{
@Override
public long get()

View File

@ -0,0 +1,113 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.monomorphicprocessing;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.TreeSet;
public class StringRuntimeShapeTest
{
static class Empty implements HotLoopCallee
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
}
}
static class Foo implements HotLoopCallee
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("flag1", true);
inspector.visit("flag2", false);
inspector.visit("key", "value");
inspector.visit("empty", new Empty());
inspector.visit("object", ByteBuffer.allocate(1));
inspector.visit("array", new Set[] {new HashSet(), new TreeSet()});
inspector.visit("emptyArray", new Set[] {});
}
}
static class Bar implements HotLoopCallee
{
@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("foo", new Foo());
inspector.visit("array", new Foo[] {new Foo(), new Foo()});
}
}
@Test
public void testStringRuntimeShape()
{
String barRuntimeShape = StringRuntimeShape.of(new Bar());
Assert.assertEquals(
"io.druid.query.monomorphicprocessing.StringRuntimeShapeTest$Bar {\n"
+ " foo: io.druid.query.monomorphicprocessing.StringRuntimeShapeTest$Foo {\n"
+ " flag1: true,\n"
+ " flag2: false,\n"
+ " key: value,\n"
+ " empty: io.druid.query.monomorphicprocessing.StringRuntimeShapeTest$Empty,\n"
+ " object: java.nio.HeapByteBuffer {order: BIG_ENDIAN},\n"
+ " array: [\n"
+ " java.util.HashSet,\n"
+ " java.util.TreeSet\n"
+ " ],\n"
+ " emptyArray: []\n"
+ " },\n"
+ " array: [\n"
+ " io.druid.query.monomorphicprocessing.StringRuntimeShapeTest$Foo {\n"
+ " flag1: true,\n"
+ " flag2: false,\n"
+ " key: value,\n"
+ " empty: io.druid.query.monomorphicprocessing.StringRuntimeShapeTest$Empty,\n"
+ " object: java.nio.HeapByteBuffer {order: BIG_ENDIAN},\n"
+ " array: [\n"
+ " java.util.HashSet,\n"
+ " java.util.TreeSet\n"
+ " ],\n"
+ " emptyArray: []\n"
+ " },\n"
+ " io.druid.query.monomorphicprocessing.StringRuntimeShapeTest$Foo {\n"
+ " flag1: true,\n"
+ " flag2: false,\n"
+ " key: value,\n"
+ " empty: io.druid.query.monomorphicprocessing.StringRuntimeShapeTest$Empty,\n"
+ " object: java.nio.HeapByteBuffer {order: BIG_ENDIAN},\n"
+ " array: [\n"
+ " java.util.HashSet,\n"
+ " java.util.TreeSet\n"
+ " ],\n"
+ " emptyArray: []\n"
+ " }\n"
+ " ]\n"
+ "}",
barRuntimeShape
);
}
}

View File

@ -45,6 +45,7 @@ import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.TestQueryRunners;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
@ -107,6 +108,23 @@ public class TopNQueryRunnerTest
{
@Parameterized.Parameters(name="{0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
List<QueryRunner<Result<TopNResultValue>>> retVal = queryRunners();
List<Object[]> parameters = new ArrayList<>();
for (int i = 0; i < 8; i++) {
for (QueryRunner<Result<TopNResultValue>> firstParameter : retVal) {
Object[] params = new Object[4];
params[0] = firstParameter;
params[1] = (i & 1) != 0;
params[2] = (i & 2) != 0;
params[3] = (i & 4) != 0;
parameters.add(params);
}
}
return parameters;
}
public static List<QueryRunner<Result<TopNResultValue>>> queryRunners() throws IOException
{
List<QueryRunner<Result<TopNResultValue>>> retVal = Lists.newArrayList();
retVal.addAll(
@ -143,20 +161,53 @@ public class TopNQueryRunnerTest
)
)
);
return QueryRunnerTestHelper.transformToConstructionFeeder(retVal);
return retVal;
}
private final QueryRunner<Result<TopNResultValue>> runner;
private final boolean duplicateSingleAggregatorQueries;
@Rule
public ExpectedException expectedException = ExpectedException.none();
public TopNQueryRunnerTest(
QueryRunner<Result<TopNResultValue>> runner
QueryRunner<Result<TopNResultValue>> runner,
boolean specializeGeneric1AggPooledTopN,
boolean specializeGeneric2AggPooledTopN,
boolean duplicateSingleAggregatorQueries
)
{
this.runner = runner;
PooledTopNAlgorithm.specializeGeneric1AggPooledTopN = specializeGeneric1AggPooledTopN;
PooledTopNAlgorithm.specializeGeneric2AggPooledTopN = specializeGeneric2AggPooledTopN;
this.duplicateSingleAggregatorQueries = duplicateSingleAggregatorQueries;
}
private List<AggregatorFactory> duplicateAggregators(AggregatorFactory aggregatorFactory, AggregatorFactory duplicate)
{
if (duplicateSingleAggregatorQueries) {
return ImmutableList.of(aggregatorFactory, duplicate);
} else {
return Collections.singletonList(aggregatorFactory);
}
}
private List<Map<String, Object>> withDuplicateResults(
List<? extends Map<String, Object>> results,
String key,
String duplicateKey
)
{
if (!duplicateSingleAggregatorQueries) {
return (List<Map<String, Object>>) results;
}
List<Map<String, Object>> resultsWithDuplicates = new ArrayList<>();
for (Map<String, Object> result : results) {
resultsWithDuplicates.add(
ImmutableMap.<String, Object>builder().putAll(result).put(duplicateKey, result.get(key)).build()
);
}
return resultsWithDuplicates;
}
private Sequence<Result<TopNResultValue>> assertExpectedResults(
@ -2992,6 +3043,10 @@ public class TopNQueryRunnerTest
@Test
public void testTopNQueryByComplexMetric()
{
ImmutableList<DimensionSpec> aggregatorDimensionSpecs = ImmutableList.<DimensionSpec>of(new DefaultDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
QueryRunnerTestHelper.qualityDimension
));
TopNQuery query =
new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
@ -3000,37 +3055,33 @@ public class TopNQueryRunnerTest
.metric(new NumericTopNMetricSpec("numVals"))
.threshold(10)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
new CardinalityAggregatorFactory(
"numVals",
ImmutableList.<DimensionSpec>of(new DefaultDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
QueryRunnerTestHelper.qualityDimension
)),
false
)
)
)
.aggregators(duplicateAggregators(
new CardinalityAggregatorFactory("numVals", aggregatorDimensionSpecs, false),
new CardinalityAggregatorFactory("numVals1", aggregatorDimensionSpecs, false)
))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
new DateTime("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"market", "spot",
"numVals", 9.019833517963864d
withDuplicateResults(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"market", "spot",
"numVals", 9.019833517963864d
),
ImmutableMap.<String, Object>of(
"market", "total_market",
"numVals", 2.000977198748901d
),
ImmutableMap.<String, Object>of(
"market", "upfront",
"numVals", 2.000977198748901d
)
),
ImmutableMap.<String, Object>of(
"market", "total_market",
"numVals", 2.000977198748901d
),
ImmutableMap.<String, Object>of(
"market", "upfront",
"numVals", 2.000977198748901d
)
"numVals",
"numVals1"
)
)
)
@ -3048,6 +3099,11 @@ public class TopNQueryRunnerTest
QueryRunnerTestHelper.marketDimension,
helloFn);
ImmutableList<DimensionSpec> aggregatorDimensionSpecs = ImmutableList.<DimensionSpec>of(new ExtractionDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
QueryRunnerTestHelper.qualityDimension,
helloFn
));
TopNQuery query =
new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
@ -3056,30 +3112,25 @@ public class TopNQueryRunnerTest
.metric(new NumericTopNMetricSpec("numVals"))
.threshold(10)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
new CardinalityAggregatorFactory(
"numVals",
ImmutableList.<DimensionSpec>of(new ExtractionDimensionSpec(
QueryRunnerTestHelper.qualityDimension,
QueryRunnerTestHelper.qualityDimension,
helloFn
)),
false
)
)
)
.aggregators(duplicateAggregators(
new CardinalityAggregatorFactory("numVals", aggregatorDimensionSpecs,false),
new CardinalityAggregatorFactory("numVals1",aggregatorDimensionSpecs,false)
))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
new DateTime("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
"market", "hello",
"numVals", 1.0002442201269182d
)
withDuplicateResults(
Collections.singletonList(
ImmutableMap.<String, Object>of(
"market", "hello",
"numVals", 1.0002442201269182d
)
),
"numVals",
"numVals1"
)
)
)
@ -3567,21 +3618,28 @@ public class TopNQueryRunnerTest
.metric(new DimensionTopNMetricSpec(null, StringComparators.ALPHANUMERIC))
.threshold(2)
.intervals(QueryRunnerTestHelper.secondOnly)
.aggregators(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.aggregators(duplicateAggregators(
QueryRunnerTestHelper.rowsCount,
new CountAggregatorFactory("rows1")
))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-04-02T00:00:00.000Z"),
new TopNResultValue(
Arrays.asList(
ImmutableMap.<String, Object>of(
"market", "spot",
"rows", 9L
withDuplicateResults(
Arrays.asList(
ImmutableMap.<String, Object>of(
"market", "spot",
"rows", 9L
),
ImmutableMap.<String, Object>of(
"market", "total_market",
"rows", 2L
)
),
ImmutableMap.<String, Object>of(
"market", "total_market",
"rows", 2L
)
"rows",
"rows1"
)
)
)
@ -3599,21 +3657,28 @@ public class TopNQueryRunnerTest
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
.threshold(2)
.intervals(QueryRunnerTestHelper.secondOnly)
.aggregators(Lists.<AggregatorFactory>newArrayList(QueryRunnerTestHelper.rowsCount))
.aggregators(duplicateAggregators(
QueryRunnerTestHelper.rowsCount,
new CountAggregatorFactory("rows1")
))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-04-02T00:00:00.000Z"),
new TopNResultValue(
Arrays.asList(
ImmutableMap.<String, Object>of(
"market", "spot",
"rows", 9L
withDuplicateResults(
Arrays.asList(
ImmutableMap.<String, Object>of(
"market", "spot",
"rows", 9L
),
ImmutableMap.<String, Object>of(
"market", "total_market",
"rows", 2L
)
),
ImmutableMap.<String, Object>of(
"market", "total_market",
"rows", 2L
)
"rows",
"rows1"
)
)
)

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Sets;
import io.druid.collections.bitmap.BitSetBitmapFactory;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.ConciseBitmapFactory;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.bitmap.RoaringBitmapFactory;
import io.druid.segment.data.Offset;
@ -81,7 +82,8 @@ public class BitmapOffsetTest
mutable.add(val);
}
final BitmapOffset offset = new BitmapOffset(factory, factory.makeImmutableBitmap(mutable), descending);
ImmutableBitmap bitmap = factory.makeImmutableBitmap(mutable);
final BitmapOffset offset = BitmapOffset.of(bitmap, descending, bitmap.size());
final int[] expected = descending ? TEST_VALS_FLIP : TEST_VALS;
int count = 0;

Some files were not shown because too many files have changed in this diff Show More