HashJoinEngine: Check for interruptions while walking left cursor. (#16773)

* HashJoinEngine: Check for interruptions while walking left cursor.

Previously, the engine only checked for interruptions between emitting
joined rows. In scenarios where large numbers of left rows are skipped
completely (such as a highly selective INNER JOIN) this led to the
join cursor being insufficiently responsive to cancellation.

* Coverage.
This commit is contained in:
Gian Merlino 2024-07-25 00:10:50 -07:00 committed by GitHub
parent 5da69a01cb
commit c1875e7c1d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 3 deletions

View File

@ -177,7 +177,7 @@ public class HashJoinEngine
@Override
public void advance()
{
advanceUninterruptibly();
advance(true);
BaseQuery.checkInterrupted();
}
@ -196,6 +196,11 @@ public class HashJoinEngine
@Override
public void advanceUninterruptibly()
{
advance(false);
}
private void advance(boolean interruptibly)
{
joinColumnSelectorFactory.advanceRowId();
@ -217,7 +222,11 @@ public class HashJoinEngine
do {
// No more right-hand side matches; advance the left-hand side.
leftCursor.advanceUninterruptibly();
if (interruptibly) {
leftCursor.advance();
} else {
leftCursor.advanceUninterruptibly();
}
// Update joinMatcher state to match new cursor position.
matchCurrentPosition();

View File

@ -342,6 +342,7 @@ public class JoinTestHelper
.collect(Collectors.toList());
final List<Object[]> rows = new ArrayList<>();
boolean interruptible = false; // test both advance() and advanceUninterruptibly()
while (!cursor.isDone()) {
final Object[] row = new Object[columns.size()];
@ -351,7 +352,13 @@ public class JoinTestHelper
}
rows.add(row);
cursor.advance();
if (interruptible) {
cursor.advance();
} else {
cursor.advanceUninterruptibly();
}
interruptible = !interruptible;
}
return Sequences.simple(rows);