diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java index e6f67a1683e..9efbcaecdee 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java @@ -177,7 +177,7 @@ public class HashJoinEngine @Override public void advance() { - advanceUninterruptibly(); + advance(true); BaseQuery.checkInterrupted(); } @@ -196,6 +196,11 @@ public class HashJoinEngine @Override public void advanceUninterruptibly() + { + advance(false); + } + + private void advance(boolean interruptibly) { joinColumnSelectorFactory.advanceRowId(); @@ -217,7 +222,11 @@ public class HashJoinEngine do { // No more right-hand side matches; advance the left-hand side. - leftCursor.advanceUninterruptibly(); + if (interruptibly) { + leftCursor.advance(); + } else { + leftCursor.advanceUninterruptibly(); + } // Update joinMatcher state to match new cursor position. matchCurrentPosition(); diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java b/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java index 0746c104732..86d614cf4ac 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinTestHelper.java @@ -342,6 +342,7 @@ public class JoinTestHelper .collect(Collectors.toList()); final List rows = new ArrayList<>(); + boolean interruptible = false; // test both advance() and advanceUninterruptibly() while (!cursor.isDone()) { final Object[] row = new Object[columns.size()]; @@ -351,7 +352,13 @@ public class JoinTestHelper } rows.add(row); - cursor.advance(); + if (interruptible) { + cursor.advance(); + } else { + cursor.advanceUninterruptibly(); + } + + interruptible = !interruptible; } return Sequences.simple(rows);