avro-extension -- feature to specify multiple avro reader schemas inline (#3368)

* rename SimpleAvroBytesDecoder to InlineSchemaAvroBytesDecoder

* feature to specify multiple schemas inline in avro module
This commit is contained in:
Himanshu 2016-09-13 16:54:31 -05:00 committed by Fangjin Yang
parent 76a24054e3
commit a069257d37
6 changed files with 285 additions and 8 deletions

View File

@ -66,6 +66,46 @@ This decoder can be used if all the input events can be read using the same sche
...
```
##### Multiple Inline Schemas Based Avro Bytes Decoder
This decoder can be used if different input events can have different read schema. In that case schema can be specified in the input task json itself as described below.
```
...
"avroBytesDecoder": {
"type": "multiple_schemas_inline",
"schemas": {
//your id -> schema map goes here, for example
"1": {
"namespace": "io.druid.data",
"name": "User",
"type": "record",
"fields" [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
},
"2": {
"namespace": "io.druid.otherdata",
"name": "UserIdentity",
"type": "record",
"fields" [
{ "name": "Name", "type": "string" },
{ "name": "Location", "type": "string" }
]
},
...
...
}
}
...
```
Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format.
first 1 byte is version and must always be 1.
next 4 bytes are integer schema ID serialized using big-endian byte order.
remaining bytes contain serialized avro message.
##### SchemaRepo Based Avro Bytes Decoder
This Avro bytes decoder first extract `subject` and `id` from input message bytes, then use them to lookup the Avro schema with which to decode Avro record from bytes. Details can be found in [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. Towards schema registration on the message producer side, you can refer to `io.druid.data.input.AvroStreamInputRowParserTest#testParse()`.

View File

@ -26,7 +26,8 @@ import java.nio.ByteBuffer;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRepoBasedAvroBytesDecoder.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "schema_inline", value = SimpleAvroBytesDecoder.class),
@JsonSubTypes.Type(name = "schema_inline", value = InlineSchemaAvroBytesDecoder.class),
@JsonSubTypes.Type(name = "multiple_schemas_inline", value = InlineSchemasAvroBytesDecoder.class),
@JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesDecoder.class)
})
public interface AvroBytesDecoder

View File

@ -41,15 +41,15 @@ import java.util.Map;
/**
*/
public class SimpleAvroBytesDecoder implements AvroBytesDecoder
public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
{
private static final Logger logger = new Logger(SimpleAvroBytesDecoder.class);
private static final Logger logger = new Logger(InlineSchemaAvroBytesDecoder.class);
private final Schema schemaObj;
private final Map<String, Object> schema;
@JsonCreator
public SimpleAvroBytesDecoder(
public InlineSchemaAvroBytesDecoder(
@JacksonInject @Json ObjectMapper mapper,
@JsonProperty("schema") Map<String, Object> schema
) throws Exception
@ -65,7 +65,7 @@ public class SimpleAvroBytesDecoder implements AvroBytesDecoder
//For UT only
@VisibleForTesting
SimpleAvroBytesDecoder(Schema schemaObj)
InlineSchemaAvroBytesDecoder(Schema schemaObj)
{
this.schemaObj = schemaObj;
this.schema = null;

View File

@ -0,0 +1,127 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.guice.annotations.Json;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.ByteBufferInputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
*/
public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder
{
private static final Logger logger = new Logger(InlineSchemasAvroBytesDecoder.class);
private static final byte V1 = 0x1;
private final Map<Integer, Schema> schemaObjs;
private final Map<String, Map<String, Object>> schemas;
@JsonCreator
public InlineSchemasAvroBytesDecoder(
@JacksonInject @Json ObjectMapper mapper,
@JsonProperty("schemas") Map<String, Map<String, Object>> schemas
) throws Exception
{
Preconditions.checkArgument(
schemas != null && schemas.size() > 0,
"at least one schema must be provided in schemas attribute"
);
this.schemas = schemas;
schemaObjs = new HashMap<>(schemas.size());
for (Map.Entry<String, Map<String, Object>> e : schemas.entrySet()) {
int id = Integer.parseInt(e.getKey());
Map<String, Object> schema = e.getValue();
String schemaStr = mapper.writeValueAsString(schema);;
logger.info("Schema string [%s] = [%s]", id, schemaStr);
schemaObjs.put(id, new Schema.Parser().parse(schemaStr));
}
}
@VisibleForTesting
public InlineSchemasAvroBytesDecoder(
Map<Integer, Schema> schemaObjs
)
{
this.schemaObjs = schemaObjs;
this.schemas = null;
}
@JsonProperty
public Map<String, Map<String, Object>> getSchemas()
{
return schemas;
}
// It is assumed that record has following format.
// byte 1 : version, static 0x1
// byte 2-5 : int schemaId
// remaining bytes would have avro data
@Override
public GenericRecord parse(ByteBuffer bytes)
{
if (bytes.remaining() < 5) {
throw new ParseException("record must have at least 5 bytes carrying version and schemaId");
}
byte version = bytes.get();
if (version != V1) {
throw new ParseException("found record of arbitrary version [%s]", version);
}
int schemaId = bytes.getInt();
Schema schemaObj = schemaObjs.get(schemaId);
if (schemaObj == null) {
throw new ParseException("Failed to find schema for id [%s]", schemaId);
}
try {
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schemaObj);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message with schemaId [%s].", schemaId);
}
}
}

View File

@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
/**
*/
public class SimpleAvroBytesDecoderTest
public class InlineSchemaAvroBytesDecoderTest
{
@Test
public void testSerde() throws Exception
@ -60,7 +60,7 @@ public class SimpleAvroBytesDecoderTest
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
);
SimpleAvroBytesDecoder actual = (SimpleAvroBytesDecoder) mapper.readValue(
InlineSchemaAvroBytesDecoder actual = (InlineSchemaAvroBytesDecoder) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
@ -84,7 +84,7 @@ public class SimpleAvroBytesDecoderTest
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
GenericRecord actual = new SimpleAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray()));
GenericRecord actual = new InlineSchemaAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray()));
Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.AvroStreamInputRowParserTest;
import io.druid.data.input.SomeAvroDatum;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
/**
*/
public class InlineSchemasAvroBytesDecoderTest
{
@Test
public void testSerde() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"multiple_schemas_inline\",\n"
+ " \"schemas\": {\n"
+ " \"5\": {\n"
+ " \"namespace\": \"io.druid.data.input\",\n"
+ " \"name\": \"name5\",\n"
+ " \"type\": \"record\",\n"
+ " \"fields\" : [\n"
+ " {\"name\":\"eventType\",\"type\":\"string\"},\n"
+ " {\"name\":\"id\",\"type\":\"long\"}\n"
+ " ]\n"
+ " },\n"
+ " \"8\": {\n"
+ " \"namespace\": \"io.druid.data.input\",\n"
+ " \"name\": \"name8\",\n"
+ " \"type\": \"record\",\n"
+ " \"fields\" : [\n"
+ " {\"name\":\"eventType\",\"type\":\"string\"},\n"
+ " {\"name\":\"id\",\"type\":\"long\"}\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ "}\n";
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
);
InlineSchemasAvroBytesDecoder actual = (InlineSchemasAvroBytesDecoder) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
AvroBytesDecoder.class
)
),
AvroBytesDecoder.class
);
Assert.assertEquals(actual.getSchemas().get("5").get("name"), "name5");
Assert.assertEquals(actual.getSchemas().get("8").get("name"), "name8");
}
@Test
public void testParse() throws Exception
{
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(new byte[]{1});
out.write(ByteBuffer.allocate(4).putInt(10).array());
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
GenericRecord actual = new InlineSchemasAvroBytesDecoder(
ImmutableMap.of(
10,
schema
)
).parse(ByteBuffer.wrap(out.toByteArray()));
Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
}
}