Fix uncaught `ParseException` when reading Avro from Kafka (#14183)

In StreamChunkParser#parseWithInputFormat, we call byteEntityReader.read() without handling a potential ParseException, which is thrown during this function call by the delegate AvroStreamReader#intermediateRowIterator.
A ParseException can be thrown if an Avro stream has corrupt data or data that doesn't conform to the schema specified or for other decoding reasons. This exception if uncaught, can cause ingestion to fail.
This commit is contained in:
Abhishek Radhakrishnan 2023-05-04 00:05:36 -07:00 committed by GitHub
parent 954f3917ef
commit 68f908e511
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 337 additions and 14 deletions

View File

@ -140,17 +140,20 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
}
catch (IOException | RestClientException ex) {
throw new ParseException(null, ex, "Failed to fetch Avro schema from registry: %s", id);
throw new ParseException(null, ex, "Failed to fetch Avro schema id[%s] from registry. Check if the schema "
+ "exists in the registry. Otherwise it could mean that there is "
+ "malformed data in the stream or data that doesn't conform to the schema "
+ "specified.", id);
}
if (schema == null) {
throw new ParseException(null, "No Avro schema in registry: %s", id);
throw new ParseException(null, "No Avro schema id[%s] in registry", id);
}
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
try {
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
}
catch (Exception e) {
throw new ParseException(null, e, "Failed to decode Avro message for schema: %s", id);
throw new ParseException(null, e, "Failed to decode Avro message for schema id[%s]", id);
}
}

View File

@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -180,7 +181,8 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
private static int topicPostfix;
static final Module TEST_MODULE = new SimpleModule("kafkaTestModule").registerSubtypes(
new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat")
new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"),
new NamedType(TestKafkaFormatWithMalformedDataDetection.class, "testKafkaFormatWithMalformedDataDetection")
);
static {
@ -2996,6 +2998,80 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
);
}
@Test(timeout = 60_000L)
public void testParseExceptionsInIteratorConstructionSuccess() throws Exception
{
reportParseExceptions = false;
maxParseExceptions = 2;
maxSavedParseExceptions = 2;
// Prepare records and insert data
records = Arrays.asList(
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2049", "a", "y", "10", "20.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("200", TestKafkaFormatWithMalformedDataDetection.MALFORMED_KEY, "y", "10", "20.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2009", TestKafkaFormatWithMalformedDataDetection.MALFORMED_KEY, "y", "10", "20.0", "1.0"),
SAMPLE_HEADERS),
new ProducerRecord<byte[], byte[]>(topic, 0, null,
jbb("2049", "b", "y", "10", "21.0", "1.0"),
SAMPLE_HEADERS)
);
insertData();
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 4L)),
kafkaServer.consumerProperties(),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null,
new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT),
null
)
);
Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(0, 4)).unparseable(2).totalProcessed(2));
// Check published metadata
assertEqualsExceptVersion(
ImmutableList.of(
// 2 rows at last in druid
sdd("2049/P1D", 0, ImmutableList.of("a", "b"))
),
publishedDescriptors()
);
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 4L))),
newDataSchemaMetadata()
);
// Verify unparseable data
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();
ParseExceptionReport parseExceptionReport =
ParseExceptionReport.forPhase(reportData, RowIngestionMeters.BUILD_SEGMENTS);
List<String> expectedMessages = Arrays.asList(
"Unable to parse malformed data during iterator construction",
"Unable to parse malformed data during iterator construction"
);
Assert.assertEquals(expectedMessages, parseExceptionReport.getErrorMessages());
}
public static class TestKafkaInputFormat implements InputFormat
{
final InputFormat baseInputFormat;
@ -3056,4 +3132,77 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
return baseInputFormat;
}
}
/**
* This test class is a kafka input format that throws a {@link ParseException} when it encounters a malformed value
* in its input as part of the iterator construction. This should be used only for testing purposes.
*/
public static class TestKafkaFormatWithMalformedDataDetection implements InputFormat
{
public static final String MALFORMED_KEY = "malformed";
final InputFormat baseInputFormat;
@JsonCreator
public TestKafkaFormatWithMalformedDataDetection(@JsonProperty("baseInputFormat") InputFormat baseInputFormat)
{
this.baseInputFormat = baseInputFormat;
}
@Override
public boolean isSplittable()
{
return false;
}
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
final InputEntityReader delegate = baseInputFormat.createReader(inputRowSchema, source, temporaryDirectory);
final SettableByteEntity<KafkaRecordEntity> settableByteEntity = (SettableByteEntity<KafkaRecordEntity>) source;
return new InputEntityReader()
{
@Override
public CloseableIterator<InputRow> read() throws IOException
{
KafkaRecordEntity recordEntity = settableByteEntity.getEntity();
return delegate.read().map(
r -> {
MapBasedInputRow row = (MapBasedInputRow) r;
final Map<String, Object> event = new HashMap<>(row.getEvent());
if (event.containsValue(MALFORMED_KEY)) {
// Then throw an exception
throw new ParseException(null, "Unable to parse malformed data during iterator construction");
}
event.put("kafka.offset", recordEntity.getRecord().offset());
event.put("kafka.topic", recordEntity.getRecord().topic());
event.put(
"kafka.header.encoding",
new String(
recordEntity.getRecord().headers().lastHeader("encoding").value(),
StandardCharsets.UTF_8
)
);
return new MapBasedInputRow(row.getTimestamp(), row.getDimensions(), event);
}
);
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return delegate.sample();
}
};
}
@JsonProperty
public InputFormat getBaseInputFormat()
{
return baseInputFormat;
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.seekablestream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputFormat;
@ -29,6 +30,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexing.common.task.FilteringCloseableInputRowIterator;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.transform.TransformSpec;
@ -44,7 +46,7 @@ import java.util.function.Predicate;
/**
* Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader}
* or {@link InputRowParser}. This class will be useful untill we remove the deprecated InputRowParser.
* or {@link InputRowParser}. This class will be useful until we remove the deprecated {@link InputRowParser}.
*/
class StreamChunkParser<RecordType extends ByteEntity>
{
@ -90,6 +92,25 @@ class StreamChunkParser<RecordType extends ByteEntity>
this.parseExceptionHandler = parseExceptionHandler;
}
@VisibleForTesting
StreamChunkParser(
@Nullable InputRowParser<ByteBuffer> parser,
@Nullable SettableByteEntityReader<RecordType> byteEntityReader,
Predicate<InputRow> rowFilter,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler
)
{
if (parser == null && byteEntityReader == null) {
throw new IAE("Either parser or byteEntityReader should be set");
}
this.parser = parser;
this.byteEntityReader = byteEntityReader;
this.rowFilter = rowFilter;
this.rowIngestionMeters = rowIngestionMeters;
this.parseExceptionHandler = parseExceptionHandler;
}
List<InputRow> parse(@Nullable List<RecordType> streamChunk, boolean isEndOfShard) throws IOException
{
if (streamChunk == null || streamChunk.isEmpty()) {
@ -152,6 +173,9 @@ class StreamChunkParser<RecordType extends ByteEntity>
)) {
rowIterator.forEachRemaining(rows::add);
}
catch (ParseException pe) {
parseExceptionHandler.handle(pe);
}
}
return rows;
}

View File

@ -34,11 +34,14 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.transform.TransformSpec;
import org.junit.Assert;
import org.junit.Rule;
@ -46,6 +49,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@ -53,6 +57,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -68,7 +73,7 @@ public class StreamChunkParserTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final RowIngestionMeters rowIngestionMeters = new NoopRowIngestionMeters();
private final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
private final ParseExceptionHandler parseExceptionHandler = new ParseExceptionHandler(
rowIngestionMeters,
false,
@ -76,6 +81,9 @@ public class StreamChunkParserTest
0
);
@Mock
private SettableByteEntityReader mockedByteEntityReader;
@Test
public void testWithParserAndNullInputformatParseProperly() throws IOException
{
@ -91,7 +99,7 @@ public class StreamChunkParserTest
);
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
parser,
// Set nulls for all parameters below since inputFormat will be never used.
// Set nulls for all parameters below since inputFormat will never be used.
null,
null,
null,
@ -176,7 +184,6 @@ public class StreamChunkParserTest
JSONPathSpec.DEFAULT,
Collections.emptyMap()
);
RowIngestionMeters mockRowIngestionMeters = Mockito.mock(RowIngestionMeters.class);
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
null,
inputFormat,
@ -184,12 +191,13 @@ public class StreamChunkParserTest
TransformSpec.NONE,
temporaryFolder.newFolder(),
row -> true,
mockRowIngestionMeters,
rowIngestionMeters,
parseExceptionHandler
);
List<InputRow> parsedRows = chunkParser.parse(ImmutableList.of(), false);
Assert.assertEquals(0, parsedRows.size());
Mockito.verify(mockRowIngestionMeters).incrementThrownAway();
Assert.assertEquals(0, rowIngestionMeters.getUnparseable());
Assert.assertEquals(1, rowIngestionMeters.getThrownAway());
}
@Test
@ -199,7 +207,6 @@ public class StreamChunkParserTest
JSONPathSpec.DEFAULT,
Collections.emptyMap()
);
RowIngestionMeters mockRowIngestionMeters = Mockito.mock(RowIngestionMeters.class);
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
null,
inputFormat,
@ -207,12 +214,151 @@ public class StreamChunkParserTest
TransformSpec.NONE,
temporaryFolder.newFolder(),
row -> true,
mockRowIngestionMeters,
rowIngestionMeters,
parseExceptionHandler
);
List<InputRow> parsedRows = chunkParser.parse(ImmutableList.of(), true);
Assert.assertEquals(0, parsedRows.size());
Mockito.verifyNoInteractions(mockRowIngestionMeters);
Assert.assertEquals(0, rowIngestionMeters.getUnparseable());
Assert.assertEquals(0, rowIngestionMeters.getThrownAway());
}
@Test
public void testParseMalformedDataWithAllowedParseExceptions_thenNoException() throws IOException
{
final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
new JSONParseSpec(
TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
JSONPathSpec.DEFAULT,
Collections.emptyMap(),
false
),
StringUtils.UTF8_STRING
);
final int maxAllowedParseExceptions = 1;
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
parser,
mockedByteEntityReader,
row -> true,
rowIngestionMeters,
new ParseExceptionHandler(
rowIngestionMeters,
false,
maxAllowedParseExceptions,
0
)
);
Mockito.when(mockedByteEntityReader.read()).thenThrow(new ParseException(null, "error parsing malformed data"));
final String json = "malformedJson";
List<InputRow> parsedRows = chunkParser.parse(
Collections.singletonList(
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING))), false
);
// no exception and no parsed rows
Assert.assertEquals(0, parsedRows.size());
Assert.assertEquals(maxAllowedParseExceptions, rowIngestionMeters.getUnparseable());
}
@Test
public void testParseMalformedDataException() throws IOException
{
final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
new JSONParseSpec(
TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
JSONPathSpec.DEFAULT,
Collections.emptyMap(),
false
),
StringUtils.UTF8_STRING
);
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
parser,
mockedByteEntityReader,
row -> true,
rowIngestionMeters,
parseExceptionHandler
);
Mockito.when(mockedByteEntityReader.read()).thenThrow(new ParseException(null, "error parsing malformed data"));
final String json = "malformedJson";
List<ByteEntity> byteEntities = Arrays.asList(
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)),
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)),
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)),
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)),
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING))
);
Assert.assertThrows(
"Max parse exceptions[0] exceeded",
RE.class,
() -> chunkParser.parse(byteEntities, false)
);
Assert.assertEquals(1, rowIngestionMeters.getUnparseable()); // should barf on the first unparseable row
}
@Test
public void testParseMalformedDataWithUnlimitedAllowedParseExceptions_thenNoException() throws IOException
{
final InputRowParser<ByteBuffer> parser = new StringInputRowParser(
new JSONParseSpec(
TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
JSONPathSpec.DEFAULT,
Collections.emptyMap(),
false
),
StringUtils.UTF8_STRING
);
final StreamChunkParser<ByteEntity> chunkParser = new StreamChunkParser<>(
parser,
mockedByteEntityReader,
row -> true,
rowIngestionMeters,
new ParseExceptionHandler(
rowIngestionMeters,
false,
Integer.MAX_VALUE,
0
)
);
Mockito.when(mockedByteEntityReader.read()).thenThrow(new ParseException(null, "error parsing malformed data"));
final String json = "malformedJson";
List<ByteEntity> byteEntities = Arrays.asList(
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)),
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)),
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)),
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)),
new ByteEntity(json.getBytes(StringUtils.UTF8_STRING))
);
List<InputRow> parsedRows = chunkParser.parse(byteEntities, false);
// no exception since we've unlimited threhold for parse exceptions
Assert.assertEquals(0, parsedRows.size());
Assert.assertEquals(byteEntities.size(), rowIngestionMeters.getUnparseable());
}
@Test
public void testWithNullParserAndNullByteEntityReaderFailToInstantiate()
{
Assert.assertThrows(
"Either parser or byteEntityReader should be set",
IAE.class,
() -> new StreamChunkParser<>(
null,
null,
row -> true,
rowIngestionMeters,
parseExceptionHandler
)
);
}
private void parseAndAssertResult(StreamChunkParser<ByteEntity> chunkParser) throws IOException
@ -224,6 +370,7 @@ public class StreamChunkParserTest
Assert.assertEquals(DateTimes.of("2020-01-01"), row.getTimestamp());
Assert.assertEquals("val", Iterables.getOnlyElement(row.getDimension("dim")));
Assert.assertEquals("val2", Iterables.getOnlyElement(row.getDimension("met")));
Assert.assertEquals(0, rowIngestionMeters.getUnparseable());
}
private static class TrackingJsonInputFormat extends JsonInputFormat