allow quantiles merge aggregator to also accept doubles (#7718)

* allow quantiles merge aggregator to also accept doubles

* consolidate dupe

* import
This commit is contained in:
Clint Wylie 2019-05-23 11:13:41 -07:00 committed by Himanshu
parent 6964ac23a2
commit 23e96d15d4
4 changed files with 106 additions and 37 deletions

View File

@ -28,14 +28,12 @@ public class DoublesSketchBuildAggregator implements Aggregator
{ {
private final ColumnValueSelector<Double> valueSelector; private final ColumnValueSelector<Double> valueSelector;
private final int size;
private UpdateDoublesSketch sketch; private UpdateDoublesSketch sketch;
public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size) public DoublesSketchBuildAggregator(final ColumnValueSelector<Double> valueSelector, final int size)
{ {
this.valueSelector = valueSelector; this.valueSelector = valueSelector;
this.size = size;
sketch = DoublesSketch.builder().setK(size).build(); sketch = DoublesSketch.builder().setK(size).build();
} }
@ -68,5 +66,4 @@ public class DoublesSketchBuildAggregator implements Aggregator
{ {
sketch = null; sketch = null;
} }
} }

View File

@ -27,10 +27,10 @@ import org.apache.druid.segment.ColumnValueSelector;
public class DoublesSketchMergeAggregator implements Aggregator public class DoublesSketchMergeAggregator implements Aggregator
{ {
private final ColumnValueSelector<DoublesSketch> selector; private final ColumnValueSelector selector;
private DoublesUnion union; private DoublesUnion union;
public DoublesSketchMergeAggregator(final ColumnValueSelector<DoublesSketch> selector, final int k) public DoublesSketchMergeAggregator(final ColumnValueSelector selector, final int k)
{ {
this.selector = selector; this.selector = selector;
union = DoublesUnion.builder().setMaxK(k).build(); union = DoublesUnion.builder().setMaxK(k).build();
@ -39,13 +39,10 @@ public class DoublesSketchMergeAggregator implements Aggregator
@Override @Override
public synchronized void aggregate() public synchronized void aggregate()
{ {
final DoublesSketch sketch = selector.getObject(); updateUnion(selector, union);
if (sketch == null) {
return;
}
union.update(sketch);
} }
@Override @Override
public synchronized Object get() public synchronized Object get()
{ {
@ -70,4 +67,16 @@ public class DoublesSketchMergeAggregator implements Aggregator
union = null; union = null;
} }
static void updateUnion(ColumnValueSelector selector, DoublesUnion union)
{
final Object object = selector.getObject();
if (object == null) {
return;
}
if (object instanceof DoublesSketch) {
union.update((DoublesSketch) object);
} else {
union.update(selector.getDouble());
}
}
} }

View File

@ -20,7 +20,6 @@
package org.apache.druid.query.aggregation.datasketches.quantiles; package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.yahoo.memory.WritableMemory; import com.yahoo.memory.WritableMemory;
import com.yahoo.sketches.quantiles.DoublesSketch;
import com.yahoo.sketches.quantiles.DoublesUnion; import com.yahoo.sketches.quantiles.DoublesUnion;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@ -34,14 +33,14 @@ import java.util.IdentityHashMap;
public class DoublesSketchMergeBufferAggregator implements BufferAggregator public class DoublesSketchMergeBufferAggregator implements BufferAggregator
{ {
private final ColumnValueSelector<DoublesSketch> selector; private final ColumnValueSelector selector;
private final int k; private final int k;
private final int maxIntermediateSize; private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>(); private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>(); private final IdentityHashMap<ByteBuffer, Int2ObjectMap<DoublesUnion>> unions = new IdentityHashMap<>();
public DoublesSketchMergeBufferAggregator( public DoublesSketchMergeBufferAggregator(
final ColumnValueSelector<DoublesSketch> selector, final ColumnValueSelector selector,
final int k, final int k,
final int maxIntermediateSize) final int maxIntermediateSize)
{ {
@ -62,12 +61,8 @@ public class DoublesSketchMergeBufferAggregator implements BufferAggregator
@Override @Override
public synchronized void aggregate(final ByteBuffer buffer, final int position) public synchronized void aggregate(final ByteBuffer buffer, final int position)
{ {
final DoublesSketch sketch = selector.getObject();
if (sketch == null) {
return;
}
final DoublesUnion union = unions.get(buffer).get(position); final DoublesUnion union = unions.get(buffer).get(position);
union.update(sketch); DoublesSketchMergeAggregator.updateUnion(selector, union);
} }
@Override @Override

View File

@ -125,7 +125,8 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
CalciteTests.getJsonMapper().registerModule(mod); CalciteTests.getJsonMapper().registerModule(mod);
} }
final QueryableIndex index = IndexBuilder.create() final QueryableIndex index =
IndexBuilder.create()
.tmpDir(temporaryFolder.newFolder()) .tmpDir(temporaryFolder.newFolder())
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema( .schema(
@ -401,6 +402,73 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
); );
} }
@Test
public void testQuantileOnInnerQuantileQuery() throws Exception
{
SqlLifecycle sqlLifecycle = sqlLifecycleFactory.factorize();
final String sql = "SELECT dim1, APPROX_QUANTILE_DS(x, 0.5)\n"
+ "FROM (SELECT dim1, dim2, APPROX_QUANTILE_DS(m1, 0.5) AS x FROM foo GROUP BY dim1, dim2) GROUP BY dim1";
final List<Object[]> results = sqlLifecycle.runSimple(sql, QUERY_CONTEXT_DEFAULT, authenticationResult).toList();
ImmutableList.Builder<Object[]> builder = ImmutableList.builder();
builder.add(new Object[]{"", 1.0});
builder.add(new Object[]{"1", 4.0});
builder.add(new Object[]{"10.1", 2.0});
builder.add(new Object[]{"2", 3.0});
builder.add(new Object[]{"abc", 6.0});
builder.add(new Object[]{"def", 5.0});
final List<Object[]> expectedResults = builder.build();
Assert.assertEquals(expectedResults.size(), results.size());
for (int i = 0; i < expectedResults.size(); i++) {
Assert.assertArrayEquals(expectedResults.get(i), results.get(i));
}
// Verify query
Assert.assertEquals(
GroupByQuery.builder()
.setDataSource(
new QueryDataSource(
GroupByQuery.builder()
.setDataSource(CalciteTests.DATASOURCE1)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setDimensions(
new DefaultDimensionSpec("dim1", "d0"),
new DefaultDimensionSpec("dim2", "d1")
)
.setAggregatorSpecs(
ImmutableList.of(
new DoublesSketchAggregatorFactory("a0:agg", "m1", 128)
)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.5f)
)
)
.setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
.build()
)
)
.setInterval(new MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
.setGranularity(Granularities.ALL)
.setDimensions(new DefaultDimensionSpec("d0", "_d0", ValueType.STRING))
.setAggregatorSpecs(
new DoublesSketchAggregatorFactory("_a0:agg", "a0", 128)
)
.setPostAggregatorSpecs(
ImmutableList.of(
new DoublesSketchToQuantilePostAggregator("_a0", makeFieldAccessPostAgg("_a0:agg"), 0.5f)
)
)
.setContext(ImmutableMap.of(PlannerContext.CTX_SQL_QUERY_ID, "dummy"))
.build(),
Iterables.getOnlyElement(queryLogHook.getRecordedQueries())
);
}
private static PostAggregator makeFieldAccessPostAgg(String name) private static PostAggregator makeFieldAccessPostAgg(String name)
{ {
return new FieldAccessPostAggregator(name, name); return new FieldAccessPostAggregator(name, name);