avro-extensions -- feature to specify avro reader schema inline in the task json for all events (#3249)

This commit is contained in:
Himanshu 2016-08-10 12:49:26 -05:00 committed by Fangjin Yang
parent 1eb7a7e882
commit 46da682231
5 changed files with 211 additions and 0 deletions

View File

@ -44,6 +44,28 @@ For example, using Avro stream parser with schema repo Avro bytes decoder:
If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.
##### Inline Schema Based Avro Bytes Decoder
This decoder can be used if all the input events can be read using the same schema. In that case schema can be specified in the input task json itself as described below.
```
...
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "io.druid.data",
"name": "User",
"type": "record",
"fields" [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
}
...
```
##### SchemaRepo Based Avro Bytes Decoder
This Avro bytes decoder first extract `subject` and `id` from input message bytes, then use them to lookup the Avro schema with which to decode Avro record from bytes. Details can be found in [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. Towards schema registration on the message producer side, you can refer to `io.druid.data.input.AvroStreamInputRowParserTest#testParse()`.

View File

@ -90,6 +90,12 @@
<version>0.15.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRepoBasedAvroBytesDecoder.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "schema_inline", value = SimpleAvroBytesDecoder.class),
@JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesDecoder.class)
})
public interface AvroBytesDecoder

View File

@ -0,0 +1,92 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.guice.annotations.Json;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.ByteBufferInputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
/**
*/
public class SimpleAvroBytesDecoder implements AvroBytesDecoder
{
private static final Logger logger = new Logger(SimpleAvroBytesDecoder.class);
private final Schema schemaObj;
private final Map<String, Object> schema;
@JsonCreator
public SimpleAvroBytesDecoder(
@JacksonInject @Json ObjectMapper mapper,
@JsonProperty("schema") Map<String, Object> schema
) throws Exception
{
Preconditions.checkArgument(schema != null, "schema must be provided");
this.schema = schema;
String schemaStr = mapper.writeValueAsString(schema);;
logger.info("Schema string [%s]", schemaStr);
schemaObj = new Schema.Parser().parse(schemaStr);
}
//For UT only
@VisibleForTesting
SimpleAvroBytesDecoder(Schema schemaObj)
{
this.schemaObj = schemaObj;
this.schema = null;
}
@JsonProperty
public Map<String, Object> getSchema()
{
return schema;
}
@Override
public GenericRecord parse(ByteBuffer bytes)
{
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schemaObj);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
try {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message!");
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.data.input.AvroStreamInputRowParserTest;
import io.druid.data.input.SomeAvroDatum;
import io.druid.jackson.DefaultObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
/**
*/
public class SimpleAvroBytesDecoderTest
{
@Test
public void testSerde() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"schema_inline\",\n"
+ " \"schema\": {\n"
+ " \"namespace\": \"io.druid.data.input\",\n"
+ " \"name\": \"SomeData\",\n"
+ " \"type\": \"record\",\n"
+ " \"fields\" : [\n"
+ " {\"name\":\"timestamp\",\"type\":\"long\"},\n"
+ " {\"name\":\"eventType\",\"type\":\"string\"},\n"
+ " {\"name\":\"id\",\"type\":\"long\"}\n"
+ " ]\n"
+ " }\n"
+ "}";
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
);
SimpleAvroBytesDecoder actual = (SimpleAvroBytesDecoder) mapper.readValue(
mapper.writeValueAsString(
mapper.readValue(
jsonStr,
AvroBytesDecoder.class
)
),
AvroBytesDecoder.class
);
Assert.assertEquals(actual.getSchema().get("name"), "SomeData");
}
@Test
public void testParse() throws Exception
{
GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
Schema schema = SomeAvroDatum.getClassSchema();
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
GenericRecord actual = new SimpleAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray()));
Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
}
}