fix parse exception handling for stream parsers (#11556)

* fix parse exception handling

* fix style and inspections
This commit is contained in:
Clint Wylie 2021-08-09 12:40:44 -07:00 committed by GitHub
parent 06bae29979
commit f2ac6cd96e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 124 additions and 44 deletions

View File

@ -59,16 +59,21 @@ public class FilteringCloseableInputRowIterator implements CloseableIterator<Inp
@Override
public boolean hasNext()
{
while (next == null && delegate.hasNext()) {
while (true) {
try {
// delegate.next() can throw ParseException
final InputRow row = delegate.next();
// filter.test() can throw ParseException
if (filter.test(row)) {
next = row;
} else {
rowIngestionMeters.incrementThrownAway();
// delegate.hasNext() can throw ParseException, since some types of delegating iterators will call next on
// their underlying iterator
while (next == null && delegate.hasNext()) {
// delegate.next() can throw ParseException
final InputRow row = delegate.next();
// filter.test() can throw ParseException
if (filter.test(row)) {
next = row;
} else {
rowIngestionMeters.incrementThrownAway();
}
}
break;
}
catch (ParseException e) {
parseExceptionHandler.handle(e);

View File

@ -46,6 +46,14 @@ import java.util.stream.Collectors;
public class FilteringCloseableInputRowIteratorTest
{
private static final List<String> DIMENSIONS = ImmutableList.of("dim1", "dim2");
private static final List<InputRow> ROWS = ImmutableList.of(
newRow(DateTimes.of("2020-01-01"), 10, 200),
newRow(DateTimes.of("2020-01-01"), 10, 400),
newRow(DateTimes.of("2020-01-01"), 20, 400),
newRow(DateTimes.of("2020-01-01"), 10, 800),
newRow(DateTimes.of("2020-01-01"), 30, 200),
newRow(DateTimes.of("2020-01-01"), 10, 300)
);
private RowIngestionMeters rowIngestionMeters;
private ParseExceptionHandler parseExceptionHandler;
@ -65,17 +73,9 @@ public class FilteringCloseableInputRowIteratorTest
@Test
public void testFilterOutRows()
{
final List<InputRow> rows = ImmutableList.of(
newRow(DateTimes.of("2020-01-01"), 10, 200),
newRow(DateTimes.of("2020-01-01"), 10, 400),
newRow(DateTimes.of("2020-01-01"), 20, 400),
newRow(DateTimes.of("2020-01-01"), 10, 800),
newRow(DateTimes.of("2020-01-01"), 30, 200),
newRow(DateTimes.of("2020-01-01"), 10, 300)
);
final Predicate<InputRow> filter = row -> (Integer) row.getRaw("dim1") == 10;
final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
CloseableIterators.withEmptyBaggage(rows.iterator()),
CloseableIterators.withEmptyBaggage(ROWS.iterator()),
filter,
rowIngestionMeters,
parseExceptionHandler
@ -83,7 +83,7 @@ public class FilteringCloseableInputRowIteratorTest
final List<InputRow> filteredRows = new ArrayList<>();
rowIterator.forEachRemaining(filteredRows::add);
Assert.assertEquals(
rows.stream().filter(filter).collect(Collectors.toList()),
ROWS.stream().filter(filter).collect(Collectors.toList()),
filteredRows
);
Assert.assertEquals(2, rowIngestionMeters.getThrownAway());
@ -92,19 +92,10 @@ public class FilteringCloseableInputRowIteratorTest
@Test
public void testParseExceptionInDelegateNext()
{
final List<InputRow> rows = ImmutableList.of(
newRow(DateTimes.of("2020-01-01"), 10, 200),
newRow(DateTimes.of("2020-01-01"), 10, 400),
newRow(DateTimes.of("2020-01-01"), 20, 400),
newRow(DateTimes.of("2020-01-01"), 10, 800),
newRow(DateTimes.of("2020-01-01"), 30, 200),
newRow(DateTimes.of("2020-01-01"), 10, 300)
);
// This iterator throws ParseException every other call to next().
final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
{
final int numRowsToIterate = rows.size() * 2;
final int numRowsToIterate = ROWS.size() * 2;
int nextIdx = 0;
@Override
@ -118,7 +109,7 @@ public class FilteringCloseableInputRowIteratorTest
{
final int currentIdx = nextIdx++;
if (currentIdx % 2 == 0) {
return rows.get(currentIdx / 2);
return ROWS.get(currentIdx / 2);
} else {
throw new ParseException("Parse exception at ", currentIdx);
}
@ -139,24 +130,16 @@ public class FilteringCloseableInputRowIteratorTest
final List<InputRow> filteredRows = new ArrayList<>();
rowIterator.forEachRemaining(filteredRows::add);
Assert.assertEquals(rows, filteredRows);
Assert.assertEquals(rows.size(), rowIngestionMeters.getUnparseable());
Assert.assertEquals(ROWS, filteredRows);
Assert.assertEquals(ROWS.size(), rowIngestionMeters.getUnparseable());
}
@Test
public void testParseExceptionInPredicateTest()
{
final List<InputRow> rows = ImmutableList.of(
newRow(DateTimes.of("2020-01-01"), 10, 200),
newRow(DateTimes.of("2020-01-01"), 10, 400),
newRow(DateTimes.of("2020-01-01"), 20, 400),
newRow(DateTimes.of("2020-01-01"), 10, 800),
newRow(DateTimes.of("2020-01-01"), 30, 200),
newRow(DateTimes.of("2020-01-01"), 10, 300)
);
final CloseableIterator<InputRow> parseExceptionThrowingIterator = CloseableIterators.withEmptyBaggage(
rows.iterator()
ROWS.iterator()
);
// This filter throws ParseException every other call to test().
final Predicate<InputRow> filter = new Predicate<InputRow>()
@ -186,12 +169,104 @@ public class FilteringCloseableInputRowIteratorTest
final List<InputRow> filteredRows = new ArrayList<>();
rowIterator.forEachRemaining(filteredRows::add);
final List<InputRow> expectedRows = ImmutableList.of(
rows.get(0),
rows.get(2),
rows.get(4)
ROWS.get(0),
ROWS.get(2),
ROWS.get(4)
);
Assert.assertEquals(expectedRows, filteredRows);
Assert.assertEquals(rows.size() - expectedRows.size(), rowIngestionMeters.getUnparseable());
Assert.assertEquals(ROWS.size() - expectedRows.size(), rowIngestionMeters.getUnparseable());
}
@Test
public void testParseExceptionInDelegateHasNext()
{
// This iterator throws ParseException every other call to hasNext().
final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
{
final int numRowsToIterate = ROWS.size() * 2;
int currentIndex = 0;
int nextIndex = 0;
@Override
public boolean hasNext()
{
currentIndex = nextIndex++;
if (currentIndex % 2 == 0) {
return currentIndex < numRowsToIterate;
} else {
throw new ParseException("Parse exception at ", currentIndex);
}
}
@Override
public InputRow next()
{
return ROWS.get(currentIndex / 2);
}
@Override
public void close()
{
}
};
final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
parseExceptionThrowingIterator,
row -> true,
rowIngestionMeters,
parseExceptionHandler
);
final List<InputRow> filteredRows = new ArrayList<>();
rowIterator.forEachRemaining(filteredRows::add);
Assert.assertEquals(ROWS, filteredRows);
Assert.assertEquals(ROWS.size(), rowIngestionMeters.getUnparseable());
}
@Test(expected = RuntimeException.class)
public void testNonParseExceptionInDelegateHasNext()
{
// This iterator throws ParseException every other call to hasNext().
final CloseableIterator<InputRow> parseExceptionThrowingIterator = new CloseableIterator<InputRow>()
{
final int numRowsToIterate = ROWS.size() * 2;
int currentIndex = 0;
int nextIndex = 0;
@Override
public boolean hasNext()
{
currentIndex = nextIndex++;
if (currentIndex % 2 == 0) {
return currentIndex < numRowsToIterate;
} else {
throw new RuntimeException("should explode");
}
}
@Override
public InputRow next()
{
return ROWS.get(currentIndex / 2);
}
@Override
public void close()
{
}
};
final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator(
parseExceptionThrowingIterator,
row -> true,
rowIngestionMeters,
parseExceptionHandler
);
while (rowIterator.hasNext()) {
rowIterator.next();
}
Assert.fail("you never should have come here");
}
@Test