diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java index 6a88a348cc4..30af8febaae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/FilteringCloseableInputRowIterator.java @@ -59,16 +59,21 @@ public class FilteringCloseableInputRowIterator implements CloseableIterator DIMENSIONS = ImmutableList.of("dim1", "dim2"); + private static final List ROWS = ImmutableList.of( + newRow(DateTimes.of("2020-01-01"), 10, 200), + newRow(DateTimes.of("2020-01-01"), 10, 400), + newRow(DateTimes.of("2020-01-01"), 20, 400), + newRow(DateTimes.of("2020-01-01"), 10, 800), + newRow(DateTimes.of("2020-01-01"), 30, 200), + newRow(DateTimes.of("2020-01-01"), 10, 300) + ); private RowIngestionMeters rowIngestionMeters; private ParseExceptionHandler parseExceptionHandler; @@ -65,17 +73,9 @@ public class FilteringCloseableInputRowIteratorTest @Test public void testFilterOutRows() { - final List rows = ImmutableList.of( - newRow(DateTimes.of("2020-01-01"), 10, 200), - newRow(DateTimes.of("2020-01-01"), 10, 400), - newRow(DateTimes.of("2020-01-01"), 20, 400), - newRow(DateTimes.of("2020-01-01"), 10, 800), - newRow(DateTimes.of("2020-01-01"), 30, 200), - newRow(DateTimes.of("2020-01-01"), 10, 300) - ); final Predicate filter = row -> (Integer) row.getRaw("dim1") == 10; final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( - CloseableIterators.withEmptyBaggage(rows.iterator()), + CloseableIterators.withEmptyBaggage(ROWS.iterator()), filter, rowIngestionMeters, parseExceptionHandler @@ -83,7 +83,7 @@ public class FilteringCloseableInputRowIteratorTest final List filteredRows = new ArrayList<>(); rowIterator.forEachRemaining(filteredRows::add); Assert.assertEquals( - rows.stream().filter(filter).collect(Collectors.toList()), + ROWS.stream().filter(filter).collect(Collectors.toList()), filteredRows ); Assert.assertEquals(2, rowIngestionMeters.getThrownAway()); @@ -92,19 +92,10 @@ public class FilteringCloseableInputRowIteratorTest @Test public void testParseExceptionInDelegateNext() { - final List rows = ImmutableList.of( - newRow(DateTimes.of("2020-01-01"), 10, 200), - newRow(DateTimes.of("2020-01-01"), 10, 400), - newRow(DateTimes.of("2020-01-01"), 20, 400), - newRow(DateTimes.of("2020-01-01"), 10, 800), - newRow(DateTimes.of("2020-01-01"), 30, 200), - newRow(DateTimes.of("2020-01-01"), 10, 300) - ); - // This iterator throws ParseException every other call to next(). final CloseableIterator parseExceptionThrowingIterator = new CloseableIterator() { - final int numRowsToIterate = rows.size() * 2; + final int numRowsToIterate = ROWS.size() * 2; int nextIdx = 0; @Override @@ -118,7 +109,7 @@ public class FilteringCloseableInputRowIteratorTest { final int currentIdx = nextIdx++; if (currentIdx % 2 == 0) { - return rows.get(currentIdx / 2); + return ROWS.get(currentIdx / 2); } else { throw new ParseException("Parse exception at ", currentIdx); } @@ -139,24 +130,16 @@ public class FilteringCloseableInputRowIteratorTest final List filteredRows = new ArrayList<>(); rowIterator.forEachRemaining(filteredRows::add); - Assert.assertEquals(rows, filteredRows); - Assert.assertEquals(rows.size(), rowIngestionMeters.getUnparseable()); + Assert.assertEquals(ROWS, filteredRows); + Assert.assertEquals(ROWS.size(), rowIngestionMeters.getUnparseable()); } @Test public void testParseExceptionInPredicateTest() { - final List rows = ImmutableList.of( - newRow(DateTimes.of("2020-01-01"), 10, 200), - newRow(DateTimes.of("2020-01-01"), 10, 400), - newRow(DateTimes.of("2020-01-01"), 20, 400), - newRow(DateTimes.of("2020-01-01"), 10, 800), - newRow(DateTimes.of("2020-01-01"), 30, 200), - newRow(DateTimes.of("2020-01-01"), 10, 300) - ); final CloseableIterator parseExceptionThrowingIterator = CloseableIterators.withEmptyBaggage( - rows.iterator() + ROWS.iterator() ); // This filter throws ParseException every other call to test(). final Predicate filter = new Predicate() @@ -186,12 +169,104 @@ public class FilteringCloseableInputRowIteratorTest final List filteredRows = new ArrayList<>(); rowIterator.forEachRemaining(filteredRows::add); final List expectedRows = ImmutableList.of( - rows.get(0), - rows.get(2), - rows.get(4) + ROWS.get(0), + ROWS.get(2), + ROWS.get(4) ); Assert.assertEquals(expectedRows, filteredRows); - Assert.assertEquals(rows.size() - expectedRows.size(), rowIngestionMeters.getUnparseable()); + Assert.assertEquals(ROWS.size() - expectedRows.size(), rowIngestionMeters.getUnparseable()); + } + + @Test + public void testParseExceptionInDelegateHasNext() + { + // This iterator throws ParseException every other call to hasNext(). + final CloseableIterator parseExceptionThrowingIterator = new CloseableIterator() + { + final int numRowsToIterate = ROWS.size() * 2; + int currentIndex = 0; + int nextIndex = 0; + + @Override + public boolean hasNext() + { + currentIndex = nextIndex++; + if (currentIndex % 2 == 0) { + return currentIndex < numRowsToIterate; + } else { + throw new ParseException("Parse exception at ", currentIndex); + } + } + + @Override + public InputRow next() + { + return ROWS.get(currentIndex / 2); + } + + @Override + public void close() + { + } + }; + + final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( + parseExceptionThrowingIterator, + row -> true, + rowIngestionMeters, + parseExceptionHandler + ); + + final List filteredRows = new ArrayList<>(); + rowIterator.forEachRemaining(filteredRows::add); + Assert.assertEquals(ROWS, filteredRows); + Assert.assertEquals(ROWS.size(), rowIngestionMeters.getUnparseable()); + } + + @Test(expected = RuntimeException.class) + public void testNonParseExceptionInDelegateHasNext() + { + // This iterator throws ParseException every other call to hasNext(). + final CloseableIterator parseExceptionThrowingIterator = new CloseableIterator() + { + final int numRowsToIterate = ROWS.size() * 2; + int currentIndex = 0; + int nextIndex = 0; + + @Override + public boolean hasNext() + { + currentIndex = nextIndex++; + if (currentIndex % 2 == 0) { + return currentIndex < numRowsToIterate; + } else { + throw new RuntimeException("should explode"); + } + } + + @Override + public InputRow next() + { + return ROWS.get(currentIndex / 2); + } + + @Override + public void close() + { + } + }; + + final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( + parseExceptionThrowingIterator, + row -> true, + rowIngestionMeters, + parseExceptionHandler + ); + + while (rowIterator.hasNext()) { + rowIterator.next(); + } + Assert.fail("you never should have come here"); } @Test