diff --git a/docs/content/development/extensions-core/avro.md b/docs/content/development/extensions-core/avro.md
index af42240346d..f24d8f04f23 100644
--- a/docs/content/development/extensions-core/avro.md
+++ b/docs/content/development/extensions-core/avro.md
@@ -44,6 +44,28 @@ For example, using Avro stream parser with schema repo Avro bytes decoder:
If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.
+##### Inline Schema Based Avro Bytes Decoder
+
+This decoder can be used if all the input events can be read using the same schema. In that case schema can be specified in the input task json itself as described below.
+
+```
+...
+"avroBytesDecoder": {
+ "type": "schema_inline",
+ "schema": {
+ //your schema goes here, for example
+ "namespace": "io.druid.data",
+ "name": "User",
+ "type": "record",
+ "fields" [
+ { "name": "FullName", "type": "string" },
+ { "name": "Country", "type": "string" }
+ ]
+ }
+}
+...
+```
+
##### SchemaRepo Based Avro Bytes Decoder
This Avro bytes decoder first extract `subject` and `id` from input message bytes, then use them to lookup the Avro schema with which to decode Avro record from bytes. Details can be found in [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. Towards schema registration on the message producer side, you can refer to `io.druid.data.input.AvroStreamInputRowParserTest#testParse()`.
diff --git a/extensions-core/avro-extensions/pom.xml b/extensions-core/avro-extensions/pom.xml
index 159c6f3ff1b..1cd3125f445 100644
--- a/extensions-core/avro-extensions/pom.xml
+++ b/extensions-core/avro-extensions/pom.xml
@@ -90,6 +90,12 @@
0.15.0
test
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+ test
+
diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java
index 149b34aa806..76aec57b020 100644
--- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java
+++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRepoBasedAvroBytesDecoder.class)
@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "schema_inline", value = SimpleAvroBytesDecoder.class),
@JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesDecoder.class)
})
public interface AvroBytesDecoder
diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SimpleAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SimpleAvroBytesDecoder.java
new file mode 100644
index 00000000000..d850e941b72
--- /dev/null
+++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SimpleAvroBytesDecoder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.data.input.avro;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.metamx.common.logger.Logger;
+import com.metamx.common.parsers.ParseException;
+import io.druid.guice.annotations.Json;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.ByteBufferInputStream;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ */
+public class SimpleAvroBytesDecoder implements AvroBytesDecoder
+{
+ private static final Logger logger = new Logger(SimpleAvroBytesDecoder.class);
+
+ private final Schema schemaObj;
+ private final Map schema;
+
+ @JsonCreator
+ public SimpleAvroBytesDecoder(
+ @JacksonInject @Json ObjectMapper mapper,
+ @JsonProperty("schema") Map schema
+ ) throws Exception
+ {
+ Preconditions.checkArgument(schema != null, "schema must be provided");
+
+ this.schema = schema;
+ String schemaStr = mapper.writeValueAsString(schema);;
+
+ logger.info("Schema string [%s]", schemaStr);
+ schemaObj = new Schema.Parser().parse(schemaStr);
+ }
+
+ //For UT only
+ @VisibleForTesting
+ SimpleAvroBytesDecoder(Schema schemaObj)
+ {
+ this.schemaObj = schemaObj;
+ this.schema = null;
+ }
+
+ @JsonProperty
+ public Map getSchema()
+ {
+ return schema;
+ }
+
+ @Override
+ public GenericRecord parse(ByteBuffer bytes)
+ {
+ DatumReader reader = new GenericDatumReader(schemaObj);
+ ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
+ try {
+ return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
+ }
+ catch (Exception e) {
+ throw new ParseException(e, "Fail to decode avro message!");
+ }
+ }
+}
diff --git a/extensions-core/avro-extensions/src/test/java/io/druid/data/input/avro/SimpleAvroBytesDecoderTest.java b/extensions-core/avro-extensions/src/test/java/io/druid/data/input/avro/SimpleAvroBytesDecoderTest.java
new file mode 100644
index 00000000000..075c531ff48
--- /dev/null
+++ b/extensions-core/avro-extensions/src/test/java/io/druid/data/input/avro/SimpleAvroBytesDecoderTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.data.input.avro;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.data.input.AvroStreamInputRowParserTest;
+import io.druid.data.input.SomeAvroDatum;
+import io.druid.jackson.DefaultObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class SimpleAvroBytesDecoderTest
+{
+ @Test
+ public void testSerde() throws Exception
+ {
+ String jsonStr = "{\n"
+ + " \"type\": \"schema_inline\",\n"
+ + " \"schema\": {\n"
+ + " \"namespace\": \"io.druid.data.input\",\n"
+ + " \"name\": \"SomeData\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"fields\" : [\n"
+ + " {\"name\":\"timestamp\",\"type\":\"long\"},\n"
+ + " {\"name\":\"eventType\",\"type\":\"string\"},\n"
+ + " {\"name\":\"id\",\"type\":\"long\"}\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ mapper.setInjectableValues(
+ new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
+ );
+ SimpleAvroBytesDecoder actual = (SimpleAvroBytesDecoder) mapper.readValue(
+ mapper.writeValueAsString(
+ mapper.readValue(
+ jsonStr,
+ AvroBytesDecoder.class
+ )
+ ),
+ AvroBytesDecoder.class
+ );
+
+ Assert.assertEquals(actual.getSchema().get("name"), "SomeData");
+ }
+
+ @Test
+ public void testParse() throws Exception
+ {
+ GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
+ Schema schema = SomeAvroDatum.getClassSchema();
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ DatumWriter writer = new GenericDatumWriter(schema);
+ writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
+
+ GenericRecord actual = new SimpleAvroBytesDecoder(schema).parse(ByteBuffer.wrap(out.toByteArray()));
+ Assert.assertEquals(someAvroDatum.get("id"), actual.get("id"));
+ }
+}