Throw parse exceptions on schema get errors for SchemaRegistryBasedAvroBytesDecoder (#12080)

* Add option to throw parse exceptions on schema get errors for SchemaRegistryBasedAvroBytesDecoder

* Remove option
This commit is contained in:
Jonathan Wei 2022-01-13 12:36:51 -06:00 committed by GitHub
parent 1dba089a62
commit 74c876e578
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 19 deletions

View File

@ -488,6 +488,12 @@ Multiple Instances:
... ...
``` ```
###### Parse exceptions
The following errors when reading records will be considered parse exceptions, which can be limited and logged with ingestion task configurations such as `maxParseExceptions` and `maxSavedParseExceptions`:
- Failure to retrieve a schema due to misconfiguration or corrupt records (invalid schema IDs)
- Failure to decode an Avro message
### Avro OCF ### Avro OCF
To load the Avro OCF input format, load the Druid Avro extension ([`druid-avro-extensions`](../development/extensions-core/avro.md)). To load the Avro OCF input format, load the Druid Avro extension ([`druid-avro-extensions`](../development/extensions-core/avro.md)).

View File

@ -35,7 +35,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DecoderFactory;
import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils; import org.apache.druid.utils.DynamicConfigProviderUtils;
@ -141,10 +140,10 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
} }
catch (IOException | RestClientException ex) { catch (IOException | RestClientException ex) {
throw new RE(ex, "Failed to get Avro schema: %s", id); throw new ParseException(null, "Failed to get Avro schema: %s", id);
} }
if (schema == null) { if (schema == null) {
throw new RE("Failed to find Avro schema: %s", id); throw new ParseException(null, "Failed to find Avro schema: %s", id);
} }
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
try { try {
@ -164,24 +163,17 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;
return capacity == that.capacity
return Objects.equals(url, that.url) && && Objects.equals(url, that.url)
Objects.equals(capacity, that.capacity) && && Objects.equals(urls, that.urls)
Objects.equals(urls, that.urls) && && Objects.equals(config, that.config)
Objects.equals(config, that.config) && && Objects.equals(headers, that.headers);
Objects.equals(headers, that.headers);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
int result = url != null ? url.hashCode() : 0; return Objects.hash(registry, url, capacity, urls, config, headers, jsonMapper);
result = 31 * result + capacity;
result = 31 * result + (urls != null ? urls.hashCode() : 0);
result = 31 * result + (config != null ? config.hashCode() : 0);
result = 31 * result + (headers != null ? headers.hashCode() : 0);
return result;
} }
} }

View File

@ -33,7 +33,6 @@ import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.druid.data.input.AvroStreamInputRowParserTest; import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum; import org.apache.druid.data.input.SomeAvroDatum;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils; import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.junit.Assert; import org.junit.Assert;
@ -148,7 +147,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
} }
@Test(expected = RE.class) @Test(expected = ParseException.class)
public void testParseWrongSchemaType() throws Exception public void testParseWrongSchemaType() throws Exception
{ {
// Given // Given
@ -159,7 +158,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
} }
@Test(expected = RE.class) @Test(expected = ParseException.class)
public void testParseWrongId() throws Exception public void testParseWrongId() throws Exception
{ {
// Given // Given