PostJoinCursor should never advance without interruption (#17099)

This commit is contained in:
Abhishek Agarwal 2024-09-19 12:40:59 +05:30 committed by GitHub
parent 953fe11e31
commit 8d1e596740
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 28 additions and 21 deletions

View File

@ -69,7 +69,8 @@ public interface Cursor
/**
* Advance to the cursor to the next position. Callers should check {@link #isDone()} or
* {@link #isDoneOrInterrupted()} before getting the next value from a selector.
* {@link #isDoneOrInterrupted()} before getting the next value from a selector. However, underlying
* implementation may still check for thread interruption if advancing the cursor is a long-running operation.
*/
void advanceUninterruptibly();

View File

@ -84,21 +84,6 @@ public class PostJoinCursor implements Cursor
}
}
/**
* Matches tuples coming out of a join to a post-join condition uninterruptibly, and hence can be a long-running call.
* For this reason, {@link PostJoinCursor#advance()} instead calls {@link PostJoinCursor#advanceToMatch()} (unlike
* other cursors) that allows interruptions, thereby resolving issues where the
* <a href="https://github.com/apache/druid/issues/14514">CPU thread running PostJoinCursor cannot be terminated</a>
*/
private void advanceToMatchUninterruptibly()
{
if (valueMatcher != null) {
while (!isDone() && !valueMatcher.matches(false)) {
baseCursor.advanceUninterruptibly();
}
}
}
@Override
public ColumnSelectorFactory getColumnSelectorFactory()
{
@ -120,11 +105,17 @@ public class PostJoinCursor implements Cursor
advanceToMatch();
}
/**
* Advancing the post-join requires evaluating the join on whole segment and advancing without interruption can take
* a long time if there are no matches but the join itself is big. This can leave the thread running well after
* the timeout elapses. One such issue is described in
* <a href="https://github.com/apache/druid/issues/14514">CPU thread running PostJoinCursor cannot be terminated</a>
*/
@Override
public void advanceUninterruptibly()
{
baseCursor.advanceUninterruptibly();
advanceToMatchUninterruptibly();
advance();
}
@Override

View File

@ -200,6 +200,17 @@ public class PostJoinCursorTest extends BaseHashJoinSegmentCursorFactoryTest
@Test
public void testAdvanceWithInterruption() throws IOException, InterruptedException
{
testAdvance(true);
}
@Test
public void testAdvanceWithoutInterruption() throws IOException, InterruptedException
{
testAdvance(false);
}
private void testAdvance(boolean withInterruption) throws IOException, InterruptedException
{
final int rowsBeforeInterrupt = 1000;
@ -214,7 +225,7 @@ public class PostJoinCursorTest extends BaseHashJoinSegmentCursorFactoryTest
countriesTable = JoinTestHelper.createCountriesIndexedTable();
Thread joinCursorThread = new Thread(() -> makeCursorAndAdvance());
Thread joinCursorThread = new Thread(() -> makeCursorAndAdvance(withInterruption));
ExceptionHandler exceptionHandler = new ExceptionHandler();
joinCursorThread.setUncaughtExceptionHandler(exceptionHandler);
joinCursorThread.start();
@ -234,7 +245,7 @@ public class PostJoinCursorTest extends BaseHashJoinSegmentCursorFactoryTest
fail();
}
public void makeCursorAndAdvance()
public void makeCursorAndAdvance(boolean withInterruption)
{
List<JoinableClause> joinableClauses = ImmutableList.of(
@ -272,7 +283,11 @@ public class PostJoinCursorTest extends BaseHashJoinSegmentCursorFactoryTest
}
});
if (withInterruption) {
cursor.advance();
} else {
cursor.advanceUninterruptibly();
}
}
}
}