add support for aggregate only projections

This commit is contained in:
Clint Wylie 2024-11-15 17:00:49 -08:00
parent 24a1fafaa7
commit c6148db0e1
6 changed files with 117 additions and 34 deletions

View File

@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.AggregateProjectionMetadata;
import org.apache.druid.segment.Cursors;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
@ -39,6 +40,7 @@ import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@ -72,10 +74,10 @@ public class AggregateProjectionSpec
)
{
this.name = name;
if (CollectionUtils.isNullOrEmpty(groupingColumns)) {
throw InvalidInput.exception("groupingColumns must not be null or empty");
if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) {
throw InvalidInput.exception("groupingColumns and aggregators must not both be null or empty");
}
this.groupingColumns = groupingColumns;
this.groupingColumns = groupingColumns == null ? Collections.emptyList() : groupingColumns;
this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns;
// in the future this should be expanded to support user specified ordering, but for now we compute it based on
// the grouping columns, which is consistent with how rollup ordering works for incremental index base table
@ -169,6 +171,10 @@ public class AggregateProjectionSpec
private static ProjectionOrdering computeOrdering(VirtualColumns virtualColumns, List<DimensionSchema> groupingColumns)
{
if (groupingColumns.isEmpty()) {
// call it time ordered, there is no grouping columns so there is only 1 row for this projection
return new ProjectionOrdering(Cursors.ascendingTimeOrder(), null);
}
final List<OrderBy> ordering = Lists.newArrayListWithCapacity(groupingColumns.size());
String timeColumnName = null;

View File

@ -29,7 +29,7 @@ import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.OrderBy;
@ -40,6 +40,7 @@ import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@ -167,17 +168,17 @@ public class AggregateProjectionMetadata
@JsonProperty("name") String name,
@JsonProperty("timeColumnName") @Nullable String timeColumnName,
@JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns,
@JsonProperty("groupingColumns") List<String> groupingColumns,
@JsonProperty("groupingColumns") @Nullable List<String> groupingColumns,
@JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators,
@JsonProperty("ordering") List<OrderBy> ordering
)
{
this.name = name;
if (CollectionUtils.isNullOrEmpty(groupingColumns)) {
throw InvalidInput.exception("groupingColumns must not be null or empty");
if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) {
throw DruidException.defensive("groupingColumns and aggregators must not both be null or empty");
}
this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns;
this.groupingColumns = groupingColumns;
this.groupingColumns = groupingColumns == null ? Collections.emptyList() : groupingColumns;
this.aggregators = aggregators == null ? new AggregatorFactory[0] : aggregators;
this.ordering = ordering;

View File

@ -89,7 +89,7 @@ public class Projections
if (name != null && !name.equals(spec.getSchema().getName())) {
continue;
}
ProjectionMatch match = spec.getSchema().matches(cursorBuildSpec, physicalChecker);
final ProjectionMatch match = spec.getSchema().matches(cursorBuildSpec, physicalChecker);
if (match != null) {
if (cursorBuildSpec.getQueryMetrics() != null) {
cursorBuildSpec.getQueryMetrics().projection(spec.getSchema().getName());

View File

@ -75,7 +75,7 @@ public class AggregateProjectionSpecTest extends InitializedNullHandlingTest
null
)
);
Assert.assertEquals("groupingColumns must not be null or empty", t.getMessage());
Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage());
t = Assert.assertThrows(
DruidException.class,
@ -86,7 +86,7 @@ public class AggregateProjectionSpecTest extends InitializedNullHandlingTest
null
)
);
Assert.assertEquals("groupingColumns must not be null or empty", t.getMessage());
Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage());
}
@Test

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.aggregation.AggregatorFactory;
@ -33,6 +34,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.SortedSet;
public class AggregateProjectionMetadataTest extends InitializedNullHandlingTest
@ -132,6 +134,42 @@ public class AggregateProjectionMetadataTest extends InitializedNullHandlingTest
Assert.assertEquals(good, metadataBest.last());
}
@Test
public void testInvalidGrouping()
{
Throwable t = Assert.assertThrows(
DruidException.class,
() -> new AggregateProjectionMetadata(
new AggregateProjectionMetadata.Schema(
"other_projection",
null,
null,
null,
null,
null
),
0
)
);
Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage());
t = Assert.assertThrows(
DruidException.class,
() -> new AggregateProjectionMetadata(
new AggregateProjectionMetadata.Schema(
"other_projection",
null,
null,
Collections.emptyList(),
null,
null
),
0
)
);
Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage());
}
@Test
public void testEqualsAndHashcode()
{

View File

@ -228,15 +228,21 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest
),
null
),
// cannot really make an 'all' granularity projection, but can do something like floor time to the segment
// granularity interval resulting in a single row
new AggregateProjectionSpec(
"c_sum",
"c_sum_daily",
VirtualColumns.create(Granularities.toVirtualColumn(Granularities.DAY, "__gran")),
Collections.singletonList(new LongDimensionSchema("__gran")),
new AggregatorFactory[]{
new LongSumAggregatorFactory("_c_sum", "c")
}
),
new AggregateProjectionSpec(
"c_sum",
VirtualColumns.EMPTY,
Collections.emptyList(),
new AggregatorFactory[]{
new LongSumAggregatorFactory("_c_sum", "c")
}
)
);
@ -1062,27 +1068,9 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest
Assert.assertArrayEquals(new Object[]{"b", null, 12L, 13.2}, results.get(1).getArray());
}
private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, boolean autoSchema)
{
File tmp = FileUtils.createTempDir();
CLOSER.register(tmp::delete);
return IndexBuilder.create()
.tmpDir(tmp)
.schema(
IncrementalIndexSchema.builder()
.withDimensionsSpec(dimensionsSpec)
.withRollup(false)
.withMinTimestamp(TIMESTAMP.getMillis())
.withProjections(autoSchema ? AUTO_PROJECTIONS : PROJECTIONS)
.build()
)
.rows(ROWS);
}
@Test
public void testTimeseriesQueryGranularityFitsProjectionGranularity()
{
Assume.assumeFalse(sortByDim);
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
@ -1116,9 +1104,43 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest
}
@Test
public void testTimeseriesQueryGranularityAllFitsProjectionGranularity()
public void testTimeseriesQueryGranularityAllFitsProjectionGranularityWithSegmentGranularity()
{
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
.granularity(Granularities.ALL)
.aggregators(new LongSumAggregatorFactory("c_sum", "c"))
.context(ImmutableMap.of(QueryContexts.USE_PROJECTION, "c_sum_daily"))
.build();
final CursorBuildSpec buildSpec = TimeseriesQueryEngine.makeCursorBuildSpec(query, null);
try (final CursorHolder cursorHolder = projectionsCursorFactory.makeCursorHolder(buildSpec)) {
final Cursor cursor = cursorHolder.asCursor();
int rowCount = 0;
while (!cursor.isDone()) {
rowCount++;
cursor.advance();
}
Assert.assertEquals(1, rowCount);
}
final Sequence<Result<TimeseriesResultValue>> resultRows = timeseriesEngine.process(
query,
projectionsCursorFactory,
projectionsTimeBoundaryInspector,
null
);
final List<Result<TimeseriesResultValue>> results = resultRows.toList();
Assert.assertEquals(1, results.size());
final RowSignature querySignature = query.getResultRowSignature(RowSignature.Finalization.YES);
Assert.assertArrayEquals(new Object[]{TIMESTAMP, 19L}, getResultArray(results.get(0), querySignature));
}
@Test
public void testTimeseriesQueryGranularityAllFitsProjectionGranularityWithNoGrouping()
{
Assume.assumeFalse(sortByDim);
final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
@ -1194,6 +1216,22 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest
Assert.assertArrayEquals(new Object[]{TIMESTAMP.plusHours(1).plusMinutes(1), 2L}, getResultArray(results.get(7), querySignature));
}
private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, boolean autoSchema)
{
File tmp = FileUtils.createTempDir();
CLOSER.register(tmp::delete);
return IndexBuilder.create()
.tmpDir(tmp)
.schema(
IncrementalIndexSchema.builder()
.withDimensionsSpec(dimensionsSpec)
.withRollup(false)
.withMinTimestamp(TIMESTAMP.getMillis())
.withProjections(autoSchema ? AUTO_PROJECTIONS : PROJECTIONS)
.build()
)
.rows(ROWS);
}
private static Set<Object[]> makeArrayResultSet()
{