diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java index 8b34b62f06f..106fa9674a0 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java @@ -318,10 +318,11 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable currentValue, 0 ); - Aggregator combiningAgg = aggFactory.getCombiningFactory().factorize(combiningFactory); - combiningAgg.aggregate(); - return combiningAgg.get(); + try (Aggregator combiningAgg = aggFactory.getCombiningFactory().factorize(combiningFactory)) { + combiningAgg.aggregate(); + return aggFactory.finalizeComputation(combiningAgg.get()); + } } /** @@ -458,7 +459,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable if (rowIdProvider.get() < numRows) { for (int i = 0; i < aggs.length; i++) { aggs[i].aggregate(); - results[i][resultStorageIndex] = aggs[i].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get()); aggs[i].close(); aggs[i] = aggFactories[i].factorize(columnSelectorFactory); } @@ -471,7 +472,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) { for (int i = 0; i < aggs.length; i++) { aggs[i].aggregate(); - results[i][resultStorageIndex] = aggs[i].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get()); aggs[i].close(); // Use a combining aggregator to combine the result we just got with the result from the previous row @@ -489,7 +490,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable combiningAgg.aggregate(); combiningFactory.increment(); combiningAgg.aggregate(); - results[i][resultStorageIndex] = combiningAgg.get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(combiningAgg.get()); combiningAgg.close(); aggs[i] = aggFactories[i].factorize(columnSelectorFactory); @@ -545,7 +546,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable if (rowIdProvider.get() >= 0) { for (int i = 0; i < aggs.length; i++) { aggs[i].aggregate(); - results[i][resultStorageIndex] = aggs[i].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get()); aggs[i].close(); aggs[i] = aggFactories[i].factorize(columnSelectorFactory); } @@ -558,7 +559,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) { for (int i = 0; i < aggs.length; i++) { aggs[i].aggregate(); - results[i][resultStorageIndex] = aggs[i].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get()); aggs[i].close(); // Use a combining aggregator to combine the result we just got with the result from the previous row @@ -576,7 +577,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable combiningAgg.aggregate(); combiningFactory.decrement(); combiningAgg.aggregate(); - results[i][resultStorageIndex] = combiningAgg.get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(combiningAgg.get()); combiningAgg.close(); aggs[i] = aggFactories[i].factorize(columnSelectorFactory); @@ -667,7 +668,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable } for (int i = 0; i < aggFactories.length; ++i) { - results[i][resultStorageIndex] = aggregators[i][nextIndex].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get()); aggregators[i][nextIndex].close(); aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory); } @@ -706,7 +707,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable } for (int i = 0; i < aggFactories.length; ++i) { - results[i][resultStorageIndex] = aggregators[i][nextIndex].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get()); aggregators[i][nextIndex].close(); aggregators[i][nextIndex] = null; } @@ -719,7 +720,8 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable // End Phase 3, anything left in the window needs to be collected and put into our results for (; nextIndex < windowSize; ++nextIndex) { for (int i = 0; i < aggFactories.length; ++i) { - results[i][resultStorageIndex] = aggregators[i][nextIndex].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get()); + aggregators[i][nextIndex].close(); aggregators[i][nextIndex] = null; } ++resultStorageIndex; @@ -772,7 +774,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable if (rowId >= upperOffset) { for (int i = 0; i < aggregators.length; ++i) { - results[i][resultStorageIndex] = aggregators[i][startIndex].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][startIndex].get()); aggregators[i][startIndex].close(); aggregators[i][startIndex] = null; } @@ -790,7 +792,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable for (; startIndex < windowSize; ++startIndex) { for (int i = 0; i < aggregators.length; ++i) { - results[i][resultStorageIndex] = aggregators[i][startIndex].get(); + results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][startIndex].get()); aggregators[i][startIndex].close(); aggregators[i][startIndex] = null; } diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest new file mode 100644 index 00000000000..c45dcb8b688 --- /dev/null +++ b/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest @@ -0,0 +1,18 @@ +type: "operatorValidation" + +sql: | + SELECT + countryName, + cityName, + channel, + string_agg(channel, '|') over (partition by cityName order by countryName) s + FROM wikipedia + WHERE countryName='Austria' + GROUP BY 1, 2, 3 + +expectedResults: + - ["Austria",null,"#de.wikipedia","#de.wikipedia"] + - ["Austria","Horsching","#de.wikipedia","#de.wikipedia"] + - ["Austria","Vienna","#de.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"] + - ["Austria","Vienna","#es.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"] + - ["Austria","Vienna","#tr.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"] \ No newline at end of file