From 4c57504960568893dd387d26e2dce1acba8b6b4f Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 13 Sep 2023 10:11:28 +0000 Subject: [PATCH] Fix the uncaught exceptions when materializing results as frames (#14970) When materializing the results as frames, we defer the creation of the frames in ScanQueryQueryToolChest, which passes through the catch-all block reserved for catching cases when we don't have the complete row signature in the query (and falls back to the old code). This PR aims to resolve it by adding the frame generation code to the try-catch block we have at the outer level. --- .../server/ClientQuerySegmentWalker.java | 49 ++++++----- .../sql/calcite/CalciteSubqueryTest.java | 86 +++++++++++++++++++ 2 files changed, 112 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 9f21fe8e687..c5f858ae163 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -767,6 +767,32 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker new ArenaMemoryAllocatorFactory(FRAME_SIZE), useNestedForUnknownTypeInSubquery ); + + if (!framesOptional.isPresent()) { + throw DruidException.defensive("Unable to materialize the results as frames. Defaulting to materializing the results as rows"); + } + + Sequence frames = framesOptional.get(); + List frameSignaturePairs = new ArrayList<>(); + frames.forEach( + frame -> { + limitAccumulator.addAndGet(frame.getFrame().numRows()); + if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { + subqueryStatsProvider.incrementQueriesExceedingByteLimit(); + throw ResourceLimitExceededException.withMessage( + "Subquery generated results beyond maximum[%d] bytes", + memoryLimit + ); + + } + frameSignaturePairs.add(frame); + } + ); + return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); + + } + catch (ResourceLimitExceededException e) { + throw e; } catch (UnsupportedColumnTypeException e) { subqueryStatsProvider.incrementSubqueriesFallingBackDueToUnsufficientTypeInfo(); @@ -779,29 +805,6 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker + "while conversion. Defaulting to materializing the results as rows"); return Optional.empty(); } - - if (!framesOptional.isPresent()) { - log.debug("Unable to materialize the results as frames. Defaulting to materializing the results as rows"); - return Optional.empty(); - } - - Sequence frames = framesOptional.get(); - List frameSignaturePairs = new ArrayList<>(); - frames.forEach( - frame -> { - limitAccumulator.addAndGet(frame.getFrame().numRows()); - if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { - subqueryStatsProvider.incrementQueriesExceedingByteLimit(); - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] bytes", - memoryLimit - ); - - } - frameSignaturePairs.add(frame); - } - ); - return Optional.of(new FrameBasedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query))); } /** diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index 7b21210904a..d39c9bf1388 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -951,4 +951,90 @@ public class CalciteSubqueryTest extends BaseCalciteQueryTest ) ); } + + @Test + public void testJoinWithSubqueries() + { + cannotVectorize(); + + List results = new ArrayList<>(ImmutableList.of( + new Object[]{"", NullHandling.defaultStringValue()}, + new Object[]{"10.1", NullHandling.defaultStringValue()}, + new Object[]{"2", NullHandling.defaultStringValue()}, + new Object[]{"1", NullHandling.defaultStringValue()}, + new Object[]{"def", NullHandling.defaultStringValue()}, + new Object[]{"abc", NullHandling.defaultStringValue()} + )); + + if (NullHandling.replaceWithDefault()) { + results.add(new Object[]{NullHandling.defaultStringValue(), NullHandling.defaultStringValue()}); + } + + + testQuery( + "SELECT a.dim1, b.dim2\n" + + "FROM (SELECT na.dim1 as dim1, nb.dim2 as dim2 FROM foo na LEFT JOIN foo2 nb ON na.dim1 = nb.dim1) a\n" + + "FULL OUTER JOIN\n" + + "(SELECT nc.dim1 as dim1, nd.dim2 as dim2 FROM foo nc LEFT JOIN foo2 nd ON nc.dim1 = nd.dim1) b\n" + + "ON a.dim1 = b.dim1", + queryContext, + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + JoinDataSource.create( + new TableDataSource("foo"), + new QueryDataSource( + newScanQueryBuilder() + .dataSource("foo2") + .columns("dim1") + .eternityInterval() + .build() + ), + "j0.", + "(\"dim1\" == \"j0.dim1\")", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + CalciteTests.createJoinableFactoryWrapper() + ), + new QueryDataSource( + newScanQueryBuilder() + .dataSource( + JoinDataSource.create( + new TableDataSource("foo"), + new QueryDataSource( + newScanQueryBuilder() + .dataSource("foo2") + .columns("dim1", "dim2") + .eternityInterval() + .build() + ), + "j0.", + "(\"dim1\" == \"j0.dim1\")", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + CalciteTests.createJoinableFactoryWrapper() + ) + ) + .columns("dim1", "j0.dim2") + .eternityInterval() + .build() + ), + "_j0.", + "(\"dim1\" == \"_j0.dim1\")", + JoinType.FULL, + null, + ExprMacroTable.nil(), + CalciteTests.createJoinableFactoryWrapper() + ) + ) + .columns("_j0.j0.dim2", "dim1") + .eternityInterval() + .build() + ), + results + ); + } }