Add support for Confluent Schema Registry in the avro extension (#3529)

This commit is contained in:
Nicolas Colomer 2016-11-08 23:10:45 +01:00 committed by Himanshu
parent 657e4512d2
commit 37ecffb648
5 changed files with 232 additions and 1 deletions

View File

@ -131,6 +131,18 @@ This Avro bytes decoder first extract `subject` and `id` from input message byte
| type | String | This should say `avro_1124_rest_client`. | no |
| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |
##### Confluent's Schema Registry
This Avro bytes decoder first extract unique `id` from input message bytes, then use them it lookup in the Schema Registry for the related schema, with which to decode Avro record from bytes.
Details can be found in Schema Registry [documentation](http://docs.confluent.io/current/schema-registry/docs/) and [repository](https://github.com/confluentinc/schema-registry).
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_registry`. | no |
| url | String | Specifies the url endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default == Integer.MAX_VALUE). | no |
### Avro Hadoop Parser
This is for batch ingestion using the HadoopDruidIndexer. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"io.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, eg: `"avro.schema.path.input.value": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution). Make sure to include "io.druid.extensions:druid-avro-extensions" as an extension.

View File

@ -36,9 +36,17 @@
<properties>
<schemarepo.version>0.1.3</schemarepo.version>
<confluent.version>3.0.1</confluent.version>
<avro.version>1.7.7</avro.version>
</properties>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
@ -67,6 +75,11 @@
<artifactId>schema-repo-avro</artifactId>
<version>${schemarepo.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -77,6 +90,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.2.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>

View File

@ -28,7 +28,8 @@ import java.nio.ByteBuffer;
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "schema_inline", value = InlineSchemaAvroBytesDecoder.class),
@JsonSubTypes.Type(name = "multiple_schemas_inline", value = InlineSchemasAvroBytesDecoder.class),
@JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesDecoder.class)
@JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesDecoder.class),
@JsonSubTypes.Type(name = "schema_registry", value = SchemaRegistryBasedAvroBytesDecoder.class)
})
public interface AvroBytesDecoder
{

View File

@ -0,0 +1,92 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.druid.java.util.common.parsers.ParseException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import java.nio.ByteBuffer;
public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
{
private final SchemaRegistryClient registry;
@JsonCreator
public SchemaRegistryBasedAvroBytesDecoder(
@JsonProperty("url") String url,
@JsonProperty("capacity") Integer capacity
)
{
int identityMapCapacity = capacity == null ? Integer.MAX_VALUE : capacity;
this.registry = new CachedSchemaRegistryClient(url, identityMapCapacity);
}
//For UT only
@VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry)
{
this.registry = registry;
}
@Override
public GenericRecord parse(ByteBuffer bytes)
{
try {
bytes.get(); // ignore first \0 byte
int id = bytes.getInt(); // extract schema registry id
int length = bytes.limit() - 1 - 4;
int offset = bytes.position() + bytes.arrayOffset();
Schema schema = registry.getByID(id);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
} catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message!");
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;
return registry != null ? registry.equals(that.registry) : that.registry == null;
}
@Override
public int hashCode()
{
return registry != null ? registry.hashCode() : 0;
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.druid.data.input.AvroStreamInputRowParserTest;
import io.druid.data.input.SomeAvroDatum;
import io.druid.java.util.common.parsers.ParseException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
*/
public class SchemaRegistryBasedAvroBytesDecoderTest
{
SchemaRegistryClient registry;
@Before
public void setUp() throws Exception
{
registry = mock(SchemaRegistryClient.class);
}
@Test
public void testParse() throws Exception
{
// Given
when(registry.getByID(eq(1234))).thenReturn(SomeAvroDatum.getClassSchema());
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
bb.rewind();
// When
GenericRecord actual = new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
// Then
Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
}
@Test(expected = ParseException.class)
public void testParseCorrupted() throws Exception
{
// Given
when(registry.getByID(eq(1234))).thenReturn(SomeAvroDatum.getClassSchema());
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put((bytes), 5, 10);
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}
@Test(expected = ParseException.class)
public void testParseWrongId() throws Exception
{
// Given
when(registry.getByID(anyInt())).thenThrow(new IOException("no pasaran"));
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
byte[] bytes = getAvroDatum(schema, someAvroDatum);
ByteBuffer bb = ByteBuffer.allocate(bytes.length + 5).put((byte) 0).putInt(1234).put(bytes);
// When
new SchemaRegistryBasedAvroBytesDecoder(registry).parse(bb);
}
byte[] getAvroDatum(Schema schema, GenericRecord someAvroDatum) throws IOException
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
return out.toByteArray();
}
}