Windowed aggregates should update the aggregation value based on final compute (#16244)

This commit is contained in:
Sree Charan Manamala 2024-04-12 11:58:33 +05:30 committed by GitHub
parent da9feb4430
commit f65c166327
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 34 additions and 14 deletions

View File

@ -318,10 +318,11 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
currentValue, currentValue,
0 0
); );
Aggregator combiningAgg = aggFactory.getCombiningFactory().factorize(combiningFactory);
try (Aggregator combiningAgg = aggFactory.getCombiningFactory().factorize(combiningFactory)) {
combiningAgg.aggregate(); combiningAgg.aggregate();
return combiningAgg.get(); return aggFactory.finalizeComputation(combiningAgg.get());
}
} }
/** /**
@ -458,7 +459,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
if (rowIdProvider.get() < numRows) { if (rowIdProvider.get() < numRows) {
for (int i = 0; i < aggs.length; i++) { for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate(); aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get());
aggs[i].close(); aggs[i].close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory); aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
} }
@ -471,7 +472,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) { for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
for (int i = 0; i < aggs.length; i++) { for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate(); aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get());
aggs[i].close(); aggs[i].close();
// Use a combining aggregator to combine the result we just got with the result from the previous row // Use a combining aggregator to combine the result we just got with the result from the previous row
@ -489,7 +490,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
combiningAgg.aggregate(); combiningAgg.aggregate();
combiningFactory.increment(); combiningFactory.increment();
combiningAgg.aggregate(); combiningAgg.aggregate();
results[i][resultStorageIndex] = combiningAgg.get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(combiningAgg.get());
combiningAgg.close(); combiningAgg.close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory); aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
@ -545,7 +546,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
if (rowIdProvider.get() >= 0) { if (rowIdProvider.get() >= 0) {
for (int i = 0; i < aggs.length; i++) { for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate(); aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get());
aggs[i].close(); aggs[i].close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory); aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
} }
@ -558,7 +559,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) { for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) {
for (int i = 0; i < aggs.length; i++) { for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate(); aggs[i].aggregate();
results[i][resultStorageIndex] = aggs[i].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggs[i].get());
aggs[i].close(); aggs[i].close();
// Use a combining aggregator to combine the result we just got with the result from the previous row // Use a combining aggregator to combine the result we just got with the result from the previous row
@ -576,7 +577,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
combiningAgg.aggregate(); combiningAgg.aggregate();
combiningFactory.decrement(); combiningFactory.decrement();
combiningAgg.aggregate(); combiningAgg.aggregate();
results[i][resultStorageIndex] = combiningAgg.get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(combiningAgg.get());
combiningAgg.close(); combiningAgg.close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory); aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
@ -667,7 +668,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
} }
for (int i = 0; i < aggFactories.length; ++i) { for (int i = 0; i < aggFactories.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][nextIndex].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
aggregators[i][nextIndex].close(); aggregators[i][nextIndex].close();
aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory); aggregators[i][nextIndex] = aggFactories[i].factorize(columnSelectorFactory);
} }
@ -706,7 +707,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
} }
for (int i = 0; i < aggFactories.length; ++i) { for (int i = 0; i < aggFactories.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][nextIndex].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
aggregators[i][nextIndex].close(); aggregators[i][nextIndex].close();
aggregators[i][nextIndex] = null; aggregators[i][nextIndex] = null;
} }
@ -719,7 +720,8 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
// End Phase 3, anything left in the window needs to be collected and put into our results // End Phase 3, anything left in the window needs to be collected and put into our results
for (; nextIndex < windowSize; ++nextIndex) { for (; nextIndex < windowSize; ++nextIndex) {
for (int i = 0; i < aggFactories.length; ++i) { for (int i = 0; i < aggFactories.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][nextIndex].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
aggregators[i][nextIndex].close();
aggregators[i][nextIndex] = null; aggregators[i][nextIndex] = null;
} }
++resultStorageIndex; ++resultStorageIndex;
@ -772,7 +774,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
if (rowId >= upperOffset) { if (rowId >= upperOffset) {
for (int i = 0; i < aggregators.length; ++i) { for (int i = 0; i < aggregators.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][startIndex].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][startIndex].get());
aggregators[i][startIndex].close(); aggregators[i][startIndex].close();
aggregators[i][startIndex] = null; aggregators[i][startIndex] = null;
} }
@ -790,7 +792,7 @@ public class DefaultFramedOnHeapAggregatable implements FramedOnHeapAggregatable
for (; startIndex < windowSize; ++startIndex) { for (; startIndex < windowSize; ++startIndex) {
for (int i = 0; i < aggregators.length; ++i) { for (int i = 0; i < aggregators.length; ++i) {
results[i][resultStorageIndex] = aggregators[i][startIndex].get(); results[i][resultStorageIndex] = aggFactories[i].finalizeComputation(aggregators[i][startIndex].get());
aggregators[i][startIndex].close(); aggregators[i][startIndex].close();
aggregators[i][startIndex] = null; aggregators[i][startIndex] = null;
} }

View File

@ -0,0 +1,18 @@
type: "operatorValidation"
sql: |
SELECT
countryName,
cityName,
channel,
string_agg(channel, '|') over (partition by cityName order by countryName) s
FROM wikipedia
WHERE countryName='Austria'
GROUP BY 1, 2, 3
expectedResults:
- ["Austria",null,"#de.wikipedia","#de.wikipedia"]
- ["Austria","Horsching","#de.wikipedia","#de.wikipedia"]
- ["Austria","Vienna","#de.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]
- ["Austria","Vienna","#es.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]
- ["Austria","Vienna","#tr.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]