From c6148db0e1a7bdb95efc3b43d9a96e0de3a3aa33 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 15 Nov 2024 17:00:49 -0800 Subject: [PATCH] add support for aggregate only projections --- .../input/impl/AggregateProjectionSpec.java | 12 ++- .../segment/AggregateProjectionMetadata.java | 11 +-- .../segment/projections/Projections.java | 2 +- .../impl/AggregateProjectionSpecTest.java | 4 +- .../AggregateProjectionMetadataTest.java | 38 +++++++++ .../segment/CursorFactoryProjectionTest.java | 84 ++++++++++++++----- 6 files changed, 117 insertions(+), 34 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java index 5d96ba497fa..ab4f16781f7 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/AggregateProjectionSpec.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.OrderBy; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.AggregateProjectionMetadata; +import org.apache.druid.segment.Cursors; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnHolder; @@ -39,6 +40,7 @@ import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -72,10 +74,10 @@ public class AggregateProjectionSpec ) { this.name = name; - if (CollectionUtils.isNullOrEmpty(groupingColumns)) { - throw InvalidInput.exception("groupingColumns must not be null or empty"); + if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) { + throw InvalidInput.exception("groupingColumns and aggregators must not both be null or empty"); } - this.groupingColumns = groupingColumns; + this.groupingColumns = groupingColumns == null ? Collections.emptyList() : groupingColumns; this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns; // in the future this should be expanded to support user specified ordering, but for now we compute it based on // the grouping columns, which is consistent with how rollup ordering works for incremental index base table @@ -169,6 +171,10 @@ public class AggregateProjectionSpec private static ProjectionOrdering computeOrdering(VirtualColumns virtualColumns, List groupingColumns) { + if (groupingColumns.isEmpty()) { + // call it time ordered, there is no grouping columns so there is only 1 row for this projection + return new ProjectionOrdering(Cursors.ascendingTimeOrder(), null); + } final List ordering = Lists.newArrayListWithCapacity(groupingColumns.size()); String timeColumnName = null; diff --git a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java index 1b63d9cb1df..cfd83fcbb0b 100644 --- a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java +++ b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java @@ -29,7 +29,7 @@ import com.google.common.collect.Interner; import com.google.common.collect.Interners; import com.google.common.collect.Lists; import org.apache.druid.data.input.impl.AggregateProjectionSpec; -import org.apache.druid.error.InvalidInput; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.OrderBy; @@ -40,6 +40,7 @@ import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -167,17 +168,17 @@ public class AggregateProjectionMetadata @JsonProperty("name") String name, @JsonProperty("timeColumnName") @Nullable String timeColumnName, @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns, - @JsonProperty("groupingColumns") List groupingColumns, + @JsonProperty("groupingColumns") @Nullable List groupingColumns, @JsonProperty("aggregators") @Nullable AggregatorFactory[] aggregators, @JsonProperty("ordering") List ordering ) { this.name = name; - if (CollectionUtils.isNullOrEmpty(groupingColumns)) { - throw InvalidInput.exception("groupingColumns must not be null or empty"); + if (CollectionUtils.isNullOrEmpty(groupingColumns) && (aggregators == null || aggregators.length == 0)) { + throw DruidException.defensive("groupingColumns and aggregators must not both be null or empty"); } this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : virtualColumns; - this.groupingColumns = groupingColumns; + this.groupingColumns = groupingColumns == null ? Collections.emptyList() : groupingColumns; this.aggregators = aggregators == null ? new AggregatorFactory[0] : aggregators; this.ordering = ordering; diff --git a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java index c38a875226f..a5dd6c38436 100644 --- a/processing/src/main/java/org/apache/druid/segment/projections/Projections.java +++ b/processing/src/main/java/org/apache/druid/segment/projections/Projections.java @@ -89,7 +89,7 @@ public class Projections if (name != null && !name.equals(spec.getSchema().getName())) { continue; } - ProjectionMatch match = spec.getSchema().matches(cursorBuildSpec, physicalChecker); + final ProjectionMatch match = spec.getSchema().matches(cursorBuildSpec, physicalChecker); if (match != null) { if (cursorBuildSpec.getQueryMetrics() != null) { cursorBuildSpec.getQueryMetrics().projection(spec.getSchema().getName()); diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java index 60895ceefb0..d7d1ef18221 100644 --- a/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java +++ b/processing/src/test/java/org/apache/druid/data/input/impl/AggregateProjectionSpecTest.java @@ -75,7 +75,7 @@ public class AggregateProjectionSpecTest extends InitializedNullHandlingTest null ) ); - Assert.assertEquals("groupingColumns must not be null or empty", t.getMessage()); + Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage()); t = Assert.assertThrows( DruidException.class, @@ -86,7 +86,7 @@ public class AggregateProjectionSpecTest extends InitializedNullHandlingTest null ) ); - Assert.assertEquals("groupingColumns must not be null or empty", t.getMessage()); + Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage()); } @Test diff --git a/processing/src/test/java/org/apache/druid/segment/AggregateProjectionMetadataTest.java b/processing/src/test/java/org/apache/druid/segment/AggregateProjectionMetadataTest.java index 49b38da340e..636df8691b3 100644 --- a/processing/src/test/java/org/apache/druid/segment/AggregateProjectionMetadataTest.java +++ b/processing/src/test/java/org/apache/druid/segment/AggregateProjectionMetadataTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.OrderBy; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -33,6 +34,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.SortedSet; public class AggregateProjectionMetadataTest extends InitializedNullHandlingTest @@ -132,6 +134,42 @@ public class AggregateProjectionMetadataTest extends InitializedNullHandlingTest Assert.assertEquals(good, metadataBest.last()); } + @Test + public void testInvalidGrouping() + { + Throwable t = Assert.assertThrows( + DruidException.class, + () -> new AggregateProjectionMetadata( + new AggregateProjectionMetadata.Schema( + "other_projection", + null, + null, + null, + null, + null + ), + 0 + ) + ); + Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage()); + + t = Assert.assertThrows( + DruidException.class, + () -> new AggregateProjectionMetadata( + new AggregateProjectionMetadata.Schema( + "other_projection", + null, + null, + Collections.emptyList(), + null, + null + ), + 0 + ) + ); + Assert.assertEquals("groupingColumns and aggregators must not both be null or empty", t.getMessage()); + } + @Test public void testEqualsAndHashcode() { diff --git a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java index c065d3c6aab..9f7481763f0 100644 --- a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java @@ -228,15 +228,21 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest ), null ), - // cannot really make an 'all' granularity projection, but can do something like floor time to the segment - // granularity interval resulting in a single row new AggregateProjectionSpec( - "c_sum", + "c_sum_daily", VirtualColumns.create(Granularities.toVirtualColumn(Granularities.DAY, "__gran")), Collections.singletonList(new LongDimensionSchema("__gran")), new AggregatorFactory[]{ new LongSumAggregatorFactory("_c_sum", "c") } + ), + new AggregateProjectionSpec( + "c_sum", + VirtualColumns.EMPTY, + Collections.emptyList(), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("_c_sum", "c") + } ) ); @@ -1062,27 +1068,9 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest Assert.assertArrayEquals(new Object[]{"b", null, 12L, 13.2}, results.get(1).getArray()); } - private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, boolean autoSchema) - { - File tmp = FileUtils.createTempDir(); - CLOSER.register(tmp::delete); - return IndexBuilder.create() - .tmpDir(tmp) - .schema( - IncrementalIndexSchema.builder() - .withDimensionsSpec(dimensionsSpec) - .withRollup(false) - .withMinTimestamp(TIMESTAMP.getMillis()) - .withProjections(autoSchema ? AUTO_PROJECTIONS : PROJECTIONS) - .build() - ) - .rows(ROWS); - } - @Test public void testTimeseriesQueryGranularityFitsProjectionGranularity() { - Assume.assumeFalse(sortByDim); final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource("test") .intervals(ImmutableList.of(Intervals.ETERNITY)) @@ -1116,9 +1104,43 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest } @Test - public void testTimeseriesQueryGranularityAllFitsProjectionGranularity() + public void testTimeseriesQueryGranularityAllFitsProjectionGranularityWithSegmentGranularity() + { + final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals(ImmutableList.of(Intervals.ETERNITY)) + .granularity(Granularities.ALL) + .aggregators(new LongSumAggregatorFactory("c_sum", "c")) + .context(ImmutableMap.of(QueryContexts.USE_PROJECTION, "c_sum_daily")) + .build(); + + final CursorBuildSpec buildSpec = TimeseriesQueryEngine.makeCursorBuildSpec(query, null); + try (final CursorHolder cursorHolder = projectionsCursorFactory.makeCursorHolder(buildSpec)) { + final Cursor cursor = cursorHolder.asCursor(); + int rowCount = 0; + while (!cursor.isDone()) { + rowCount++; + cursor.advance(); + } + Assert.assertEquals(1, rowCount); + } + + final Sequence> resultRows = timeseriesEngine.process( + query, + projectionsCursorFactory, + projectionsTimeBoundaryInspector, + null + ); + + final List> results = resultRows.toList(); + Assert.assertEquals(1, results.size()); + final RowSignature querySignature = query.getResultRowSignature(RowSignature.Finalization.YES); + Assert.assertArrayEquals(new Object[]{TIMESTAMP, 19L}, getResultArray(results.get(0), querySignature)); + } + + @Test + public void testTimeseriesQueryGranularityAllFitsProjectionGranularityWithNoGrouping() { - Assume.assumeFalse(sortByDim); final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource("test") .intervals(ImmutableList.of(Intervals.ETERNITY)) @@ -1194,6 +1216,22 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest Assert.assertArrayEquals(new Object[]{TIMESTAMP.plusHours(1).plusMinutes(1), 2L}, getResultArray(results.get(7), querySignature)); } + private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, boolean autoSchema) + { + File tmp = FileUtils.createTempDir(); + CLOSER.register(tmp::delete); + return IndexBuilder.create() + .tmpDir(tmp) + .schema( + IncrementalIndexSchema.builder() + .withDimensionsSpec(dimensionsSpec) + .withRollup(false) + .withMinTimestamp(TIMESTAMP.getMillis()) + .withProjections(autoSchema ? AUTO_PROJECTIONS : PROJECTIONS) + .build() + ) + .rows(ROWS); + } private static Set makeArrayResultSet() {