Attach IO error to parse error when we can't contact Avro schema registry. (#13403)

* Attach IO error to parse error when we can't contact Avro schema registry.

The change in #12080 lost the original exception context. This patch
adds it back.

* Add hamcrest-core.

* Fix format string.
This commit is contained in:
Gian Merlino 2022-11-21 22:20:26 -08:00 committed by GitHub
parent 280a0f7158
commit c6054b7cb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 5 deletions

View File

@ -254,6 +254,11 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.druid</groupId> <groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId> <artifactId>druid-core</artifactId>

View File

@ -140,17 +140,17 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null; schema = parsedSchema instanceof AvroSchema ? ((AvroSchema) parsedSchema).rawSchema() : null;
} }
catch (IOException | RestClientException ex) { catch (IOException | RestClientException ex) {
throw new ParseException(null, "Failed to get Avro schema: %s", id); throw new ParseException(null, ex, "Failed to fetch Avro schema from registry: %s", id);
} }
if (schema == null) { if (schema == null) {
throw new ParseException(null, "Failed to find Avro schema: %s", id); throw new ParseException(null, "No Avro schema in registry: %s", id);
} }
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
try { try {
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
} }
catch (Exception e) { catch (Exception e) {
throw new ParseException(null, e, "Fail to decode Avro message for schema: %s!", id); throw new ParseException(null, e, "Failed to decode Avro message for schema: %s", id);
} }
} }

View File

@ -35,6 +35,8 @@ import org.apache.druid.data.input.SomeAvroDatum;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.DynamicConfigProviderUtils; import org.apache.druid.utils.DynamicConfigProviderUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -158,7 +160,7 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
} }
@Test(expected = ParseException.class) @Test
public void testParseWrongId() throws Exception public void testParseWrongId() throws Exception
{ {
// Given // Given
@ -166,7 +168,12 @@ public class SchemaRegistryBasedAvroBytesDecoderTest
ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234); ByteBuffer bb = ByteBuffer.allocate(5).put((byte) 0).putInt(1234);
bb.rewind(); bb.rewind();
// When // When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb); final ParseException e = Assert.assertThrows(
ParseException.class,
() -> new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb)
);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class));
MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.containsString("no pasaran"));
} }
private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException private byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException