From 5e57ddb8cc1c40e1dedb0692c941ec4879f589ce Mon Sep 17 00:00:00 2001 From: Zhao Weinan Date: Sun, 25 Oct 2015 21:17:31 +0800 Subject: [PATCH] Adding avro support to realtime & hadoop batch indexing. --- docs/content/ingestion/batch-ingestion.md | 2 + docs/content/ingestion/index.md | 106 +++++++ extensions/avro-extensions/pom.xml | 107 +++++++ .../data/input/AvroHadoopInputRowParser.java | 70 +++++ .../data/input/AvroStreamInputRowParser.java | 117 ++++++++ .../data/input/avro/AvroBytesDecoder.java | 34 +++ .../data/input/avro/AvroExtensionsModule.java | 90 ++++++ .../data/input/avro/AvroValueInputFormat.java | 73 +++++ .../input/avro/AvroValueRecordReader.java | 52 ++++ .../data/input/avro/GenericRecordAsMap.java | 151 ++++++++++ .../avro/SchemaRepoBasedAvroBytesDecoder.java | 118 ++++++++ .../Avro1124RESTRepositoryClientWrapper.java | 70 +++++ .../Avro1124SubjectAndIdConverter.java | 100 +++++++ .../schemarepo/SubjectAndIdConverter.java | 59 ++++ .../io.druid.initialization.DruidModule | 1 + .../src/test/avro/some-datum.avsc | 31 ++ .../input/AvroHadoopInputRowParserTest.java | 132 +++++++++ .../input/AvroStreamInputRowParserTest.java | 273 ++++++++++++++++++ pom.xml | 2 +- 19 files changed, 1587 insertions(+), 1 deletion(-) create mode 100644 extensions/avro-extensions/pom.xml create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/AvroHadoopInputRowParser.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueRecordReader.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/avro/GenericRecordAsMap.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java create mode 100644 extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/SubjectAndIdConverter.java create mode 100644 extensions/avro-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions/avro-extensions/src/test/avro/some-datum.avsc create mode 100644 extensions/avro-extensions/src/test/java/io/druid/data/input/AvroHadoopInputRowParserTest.java create mode 100644 extensions/avro-extensions/src/test/java/io/druid/data/input/AvroStreamInputRowParserTest.java diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index da8140c7ed4..a480e1f075c 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -137,6 +137,7 @@ Is a type of inputSpec where a static path to where the data files are located i |Field|Type|Description|Required| |-----|----|-----------|--------| |paths|Array of String|A String of input paths indicating where the raw data is located.|yes| +|inputFormat|String|The input format of the data files. Default is `org.apache.hadoop.mapreduce.lib.input.TextInputFormat`, or `org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat` if `combineText` in tuningConfig is `true`.|no| For example, using the static input paths: @@ -154,6 +155,7 @@ Is a type of inputSpec that expects data to be laid out in a specific path forma |inputPath|String|Base path to append the expected time path to.|yes| |filePattern|String|Pattern that files should match to be included.|yes| |pathFormat|String|Joda date-time format for each directory. Default value is `"'y'=yyyy/'m'=MM/'d'=dd/'H'=HH"`, or see [Joda documentation](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)|no| +|inputFormat|String|The input format of the data files. Default is `org.apache.hadoop.mapreduce.lib.input.TextInputFormat`, or `org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat` if `combineText` in tuningConfig is `true`.|no| For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths diff --git a/docs/content/ingestion/index.md b/docs/content/ingestion/index.md index c7a4dd67f81..9252bbe8db2 100644 --- a/docs/content/ingestion/index.md +++ b/docs/content/ingestion/index.md @@ -91,6 +91,112 @@ If `type` is not included, the parser defaults to `string`. | type | String | This should say `protobuf`. | no | | parseSpec | JSON Object | Specifies the format of the data. | yes | +### Avro Stream Parser + +This is for realtime ingestion. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `avro_stream`. | no | +| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes | +| parseSpec | JSON Object | Specifies the format of the data. | yes | + +For example, using Avro stream parser with schema repo Avro bytes decoder: +```json +"parser" : { + "type" : "avro_stream", + "avroBytesDecoder" : { + "type" : "schema_repo", + "subjectAndIdConverter" : { + "type" : "avro_1124", + "topic" : "${YOUR_TOPIC}" + }, + "schemaRepository" : { + "type" : "avro_1124_rest_client", + "url" : "${YOUR_SCHEMA_REPO_END_POINT}", + } + }, + "parsSpec" : { + "format" : "timeAndDims", + "timestampSpec" : {}, + "dimensionsSpec" : {} + } +} +``` + +#### Avro Bytes Decoder + +If `type` is not included, the avroBytesDecoder defaults to `schema_repo`. + +##### SchemaRepo Based Avro Bytes Decoder + +This Avro bytes decoder first extract `subject` and `id` from input message bytes, then use them to lookup the Avro schema with which to decode Avro record from bytes. Details can be found in [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. Towards schema registration on the message producer side, you can refer to `io.druid.data.input.AvroStreamInputRowParserTest#testParse()`. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `schema_repo`. | no | +| subjectAndIdConverter | JSON Object | Specifies the how to extract subject and id from message bytes. | yes | +| schemaRepository | JSON Object | Specifies the how to lookup Avro schema from subject and id. | yes | + +##### Avro-1124 Subject And Id Converter + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `avro_1124`. | no | +| topic | String | Specifies the topic of your kafka stream. | yes | + + +##### Avro-1124 Schema Repository + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `avro_1124_rest_client`. | no | +| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes | + +### Avro Hadoop Parser + +This is for batch ingestion using the HadoopDruidIndexer. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"io.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, eg: `"avro.schema.path.input.value": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution). + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | String | This should say `avro_hadoop`. | no | +| parseSpec | JSON Object | Specifies the format of the data. | yes | +| fromPigAvroStorage | Boolean | Specifies whether the data file is stored using AvroStorage. | no(default == false) | + +For example, using Avro Hadoop parser with custom reader's schema file: +```json +{ + "type" : "index_hadoop", + "hadoopDependencyCoordinates" : ["io.druid.extensions:druid-avro-extensions"], + "spec" : { + "dataSchema" : { + "dataSource" : "", + "parser" : { + "type" : "avro_hadoop", + "parsSpec" : { + "format" : "timeAndDims", + "timestampSpec" : {}, + "dimensionsSpec" : {} + } + } + }, + "ioConfig" : { + "type" : "hadoop", + "inputSpec" : { + "type" : "static", + "inputFormat": "io.druid.data.input.avro.AvroValueInputFormat", + "paths" : "" + } + }, + "tuningConfig" : { + "jobProperties" : { + "avro.schema.path.input.value" : "/path/to/my/schema.avsc", + } + } + } +} +``` + ### ParseSpec If `format` is not included, the parseSpec defaults to `tsv`. diff --git a/extensions/avro-extensions/pom.xml b/extensions/avro-extensions/pom.xml new file mode 100644 index 00000000000..029d3005934 --- /dev/null +++ b/extensions/avro-extensions/pom.xml @@ -0,0 +1,107 @@ + + + + + 4.0.0 + io.druid.extensions + druid-avro-extensions + druid-avro-extensions + druid-avro-extensions + + + io.druid + druid + 0.9.0-SNAPSHOT + ../../pom.xml + + + + 0.1.3 + 1.7.7 + + + + + org.apache.avro + avro-mapred + hadoop2 + ${avro.version} + + + io.druid + druid-api + + + org.schemarepo + schema-repo-api + ${schemarepo.version} + + + org.schemarepo + schema-repo-client + ${schemarepo.version} + + + org.schemarepo + schema-repo-avro + ${schemarepo.version} + + + org.apache.hadoop + hadoop-client + + + + junit + junit + test + + + org.apache.pig + pig + 0.15.0 + h2 + test + + + org.apache.pig + piggybank + 0.15.0 + test + + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + + + + + diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroHadoopInputRowParser.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroHadoopInputRowParser.java new file mode 100644 index 00000000000..58aaf11adf0 --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroHadoopInputRowParser.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.ParseSpec; +import org.apache.avro.generic.GenericRecord; + +import java.util.List; + +public class AvroHadoopInputRowParser implements InputRowParser +{ + private final ParseSpec parseSpec; + private final List dimensions; + private final boolean fromPigAvroStorage; + + @JsonCreator + public AvroHadoopInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage + ) + { + this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensions(); + this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage; + } + + @Override + public InputRow parse(GenericRecord record) + { + return AvroStreamInputRowParser.parseGenericRecord(record, parseSpec, dimensions, fromPigAvroStorage); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @JsonProperty + public boolean isFromPigAvroStorage() + { + return fromPigAvroStorage; + } + + @Override + public InputRowParser withParseSpec(ParseSpec parseSpec) + { + return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage); + } +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java new file mode 100644 index 00000000000..2e38024f2a1 --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java @@ -0,0 +1,117 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.data.input.avro.AvroBytesDecoder; +import io.druid.data.input.avro.GenericRecordAsMap; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import org.apache.avro.generic.GenericRecord; +import org.joda.time.DateTime; + +import java.nio.ByteBuffer; +import java.util.List; + +public class AvroStreamInputRowParser implements ByteBufferInputRowParser +{ + private final ParseSpec parseSpec; + private final List dimensions; + private final AvroBytesDecoder avroBytesDecoder; + + @JsonCreator + public AvroStreamInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder + ) + { + this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensions(); + this.avroBytesDecoder = avroBytesDecoder; + } + + @Override + public InputRow parse(ByteBuffer input) + { + return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false); + } + + protected static InputRow parseGenericRecord( + GenericRecord record, ParseSpec parseSpec, List dimensions, boolean fromPigAvroStorage + ) + { + GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage); + TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); + DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap); + return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @JsonProperty + public AvroBytesDecoder getAvroBytesDecoder() + { + return avroBytesDecoder; + } + + @Override + public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new AvroStreamInputRowParser( + parseSpec, + avroBytesDecoder + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AvroStreamInputRowParser that = (AvroStreamInputRowParser) o; + + if (!parseSpec.equals(that.parseSpec)) { + return false; + } + if (!dimensions.equals(that.dimensions)) { + return false; + } + return avroBytesDecoder.equals(that.avroBytesDecoder); + } + + @Override + public int hashCode() + { + int result = parseSpec.hashCode(); + result = 31 * result + dimensions.hashCode(); + result = 31 * result + avroBytesDecoder.hashCode(); + return result; + } +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java new file mode 100644 index 00000000000..149b34aa806 --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroBytesDecoder.java @@ -0,0 +1,34 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.avro; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.avro.generic.GenericRecord; + +import java.nio.ByteBuffer; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRepoBasedAvroBytesDecoder.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesDecoder.class) +}) +public interface AvroBytesDecoder +{ + GenericRecord parse(ByteBuffer bytes); +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java new file mode 100644 index 00000000000..12b8a61dae1 --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java @@ -0,0 +1,90 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.avro; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.data.input.AvroHadoopInputRowParser; +import io.druid.data.input.AvroStreamInputRowParser; +import io.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper; +import io.druid.initialization.DruidModule; +import org.schemarepo.InMemoryRepository; +import org.schemarepo.Repository; +import org.schemarepo.ValidatorFactory; +import org.schemarepo.json.GsonJsonUtil; +import org.schemarepo.json.JsonUtil; + +import java.util.Arrays; +import java.util.List; + +public class AvroExtensionsModule implements DruidModule +{ + public AvroExtensionsModule() {} + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("AvroInputRowParserModule") + .registerSubtypes( + new NamedType(AvroStreamInputRowParser.class, "avro_stream"), + new NamedType(AvroHadoopInputRowParser.class, "avro_hadoop") + ) + .setMixInAnnotation(Repository.class, RepositoryMixIn.class) + .setMixInAnnotation(JsonUtil.class, JsonUtilMixIn.class) + .setMixInAnnotation(InMemoryRepository.class, InMemoryRepositoryMixIn.class) + ); + } + + @Override + public void configure(Binder binder) + { } +} + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = GsonJsonUtil.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "gson", value = GsonJsonUtil.class) +}) +abstract class JsonUtilMixIn +{ +} + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Avro1124RESTRepositoryClientWrapper.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "avro_1124_rest_client", value = Avro1124RESTRepositoryClientWrapper.class), + @JsonSubTypes.Type(name = "in_memory_for_unit_test", value = InMemoryRepository.class) +}) +abstract class RepositoryMixIn +{ +} + +abstract class InMemoryRepositoryMixIn +{ + @JsonCreator + public InMemoryRepositoryMixIn(@JsonProperty("validators") ValidatorFactory validators) + { + } +} + diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java new file mode 100644 index 00000000000..db9f0090fde --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java @@ -0,0 +1,73 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.avro; + +import com.metamx.common.logger.Logger; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapreduce.AvroJob; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +import java.io.IOException; + +public class AvroValueInputFormat extends FileInputFormat +{ + private static final Logger log = new Logger(AvroValueInputFormat.class); + + private static final String CONF_INPUT_VALUE_SCHEMA_PATH = "avro.schema.input.value.path"; + + /** + * {@inheritDoc} + */ + @Override + public RecordReader createRecordReader( + InputSplit split, TaskAttemptContext context + ) throws IOException, InterruptedException + { + Schema readerSchema = AvroJob.getInputValueSchema(context.getConfiguration()); + + if (readerSchema == null) { + String schemaFilePath = context.getConfiguration().get(CONF_INPUT_VALUE_SCHEMA_PATH); + if (StringUtils.isNotBlank(schemaFilePath)) { + log.info("Using file: %s as reader schema.", schemaFilePath); + FSDataInputStream inputStream = FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath)); + try { + readerSchema = new Schema.Parser().parse(inputStream); + } + finally { + inputStream.close(); + } + } + } + + if (null == readerSchema) { + log.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired."); + log.info("Using a reader schema equal to the writer schema."); + } + return new AvroValueRecordReader(readerSchema); + } +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueRecordReader.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueRecordReader.java new file mode 100644 index 00000000000..d0861eb471d --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueRecordReader.java @@ -0,0 +1,52 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.mapreduce.AvroRecordReaderBase; +import org.apache.hadoop.io.NullWritable; + +import java.io.IOException; + +public class AvroValueRecordReader extends AvroRecordReaderBase +{ + public AvroValueRecordReader(Schema readerSchema) + { + super(readerSchema); + } + + /** + * {@inheritDoc} + */ + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException + { + return NullWritable.get(); + } + + /** + * {@inheritDoc} + */ + @Override + public GenericRecord getCurrentValue() throws IOException, InterruptedException + { + return getCurrentRecord(); + } +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/GenericRecordAsMap.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/GenericRecordAsMap.java new file mode 100644 index 00000000000..e129a13acb9 --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/GenericRecordAsMap.java @@ -0,0 +1,151 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.avro; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class GenericRecordAsMap implements Map +{ + private final GenericRecord record; + private final boolean fromPigAvroStorage; + + private static final Function PIG_AVRO_STORAGE_ARRAY_TO_STRING_INCLUDING_NULL = new Function() + { + @Nullable + @Override + public String apply(Object input) + { + return String.valueOf(((GenericRecord) input).get(0)); + } + }; + + public GenericRecordAsMap(GenericRecord record, boolean fromPigAvroStorage) + { + this.record = record; + this.fromPigAvroStorage = fromPigAvroStorage; + } + + @Override + public int size() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isEmpty() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsKey(Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsValue(Object value) + { + throw new UnsupportedOperationException(); + } + + /** + * When used in MapBasedRow, field in GenericRecord will be interpret as follows: + *
    + *
  • avro schema type -> druid dimension:
  • + *
      + *
    • null, boolean, int, long, float, double, string, Records, Enums, Maps, Fixed -> String, using String.valueOf
    • + *
    • bytes -> Arrays.toString()
    • + *
    • Arrays -> List<String>, using Lists.transform(<List>dimValue, TO_STRING_INCLUDING_NULL)
    • + *
    + *
  • avro schema type -> druid metric:
  • + *
      + *
    • null -> 0F/0L
    • + *
    • int, long, float, double -> Float/Long, using Number.floatValue()/Number.longValue()
    • + *
    • string -> Float/Long, using Float.valueOf()/Long.valueOf()
    • + *
    • boolean, bytes, Arrays, Records, Enums, Maps, Fixed -> ParseException
    • + *
    + *
+ */ + @Override + public Object get(Object key) + { + Object field = record.get(key.toString()); + if (fromPigAvroStorage && field instanceof GenericData.Array) { + return Lists.transform((List) field, PIG_AVRO_STORAGE_ARRAY_TO_STRING_INCLUDING_NULL); + } + if (field instanceof ByteBuffer) { + return Arrays.toString(((ByteBuffer) field).array()); + } + return field; + } + + @Override + public Object put(String key, Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public Object remove(Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map m) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set> entrySet() + { + throw new UnsupportedOperationException(); + } +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java new file mode 100644 index 00000000000..b993021aa1d --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java @@ -0,0 +1,118 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.avro; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.Pair; +import com.metamx.common.parsers.ParseException; +import io.druid.data.input.schemarepo.SubjectAndIdConverter; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.util.ByteBufferInputStream; +import org.schemarepo.Repository; +import org.schemarepo.api.TypedSchemaRepository; +import org.schemarepo.api.converter.AvroSchemaConverter; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; + +public class SchemaRepoBasedAvroBytesDecoder implements AvroBytesDecoder +{ + private final TypedSchemaRepository typedRepository; + private final SubjectAndIdConverter subjectAndIdConverter; + private final Repository schemaRepository; + + @JsonCreator + public SchemaRepoBasedAvroBytesDecoder( + @JsonProperty("subjectAndIdConverter") SubjectAndIdConverter subjectAndIdConverter, + @JsonProperty("schemaRepository") Repository schemaRepository + ) + { + this.subjectAndIdConverter = subjectAndIdConverter; + this.schemaRepository = schemaRepository; + typedRepository = new TypedSchemaRepository( + schemaRepository, + subjectAndIdConverter.getIdConverter(), + new AvroSchemaConverter(false), + subjectAndIdConverter.getSubjectConverter() + ); + } + + @JsonProperty + public Repository getSchemaRepository() + { + return schemaRepository; + } + + @JsonProperty + public SubjectAndIdConverter getSubjectAndIdConverter() + { + return subjectAndIdConverter; + } + + @Override + public GenericRecord parse(ByteBuffer bytes) + { + Pair subjectAndId = subjectAndIdConverter.getSubjectAndId(bytes); + Schema schema = typedRepository.getSchema(subjectAndId.lhs, subjectAndId.rhs); + DatumReader reader = new GenericDatumReader(schema); + ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes)); + try { + return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); + } + catch (IOException e) { + throw new ParseException(e, "Fail to decode avro message!"); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SchemaRepoBasedAvroBytesDecoder that = (SchemaRepoBasedAvroBytesDecoder) o; + + if (subjectAndIdConverter != null + ? !subjectAndIdConverter.equals(that.subjectAndIdConverter) + : that.subjectAndIdConverter != null) { + return false; + } + return !(schemaRepository != null + ? !schemaRepository.equals(that.schemaRepository) + : that.schemaRepository != null); + } + + @Override + public int hashCode() + { + int result = subjectAndIdConverter != null ? subjectAndIdConverter.hashCode() : 0; + result = 31 * result + (schemaRepository != null ? schemaRepository.hashCode() : 0); + return result; + } +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java new file mode 100644 index 00000000000..ffad65e3904 --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124RESTRepositoryClientWrapper.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.schemarepo; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.schemarepo.client.Avro1124RESTRepositoryClient; + +public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryClient +{ + private final String url; + + public Avro1124RESTRepositoryClientWrapper( + @JsonProperty("url") String url + ) + { + super(url); + this.url = url; + } + + @JsonIgnore + @Override + public String getStatus() + { + return super.getStatus(); + } + + @JsonProperty + public String getUrl() + { + return url; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Avro1124RESTRepositoryClientWrapper that = (Avro1124RESTRepositoryClientWrapper) o; + + return !(url != null ? !url.equals(that.url) : that.url != null); + } + + @Override + public int hashCode() + { + return url != null ? url.hashCode() : 0; + } +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java new file mode 100644 index 00000000000..648f468f1c3 --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java @@ -0,0 +1,100 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.schemarepo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.Pair; +import org.schemarepo.api.converter.Converter; +import org.schemarepo.api.converter.IdentityConverter; +import org.schemarepo.api.converter.IntegerConverter; + +import java.nio.ByteBuffer; + +/** + * This implementation using injected Kafka topic name as subject name, and an integer as schema id. Before sending avro + * message to Kafka broker, you need to register the schema to an schema repository, get the schema id, serialized it to + * 4 bytes and then insert them to the head of the payload. In the reading end, you extract 4 bytes from raw messages, + * deserialize and return it with the topic name, with which you can lookup the avro schema. + * + * @see SubjectAndIdConverter + */ +public class Avro1124SubjectAndIdConverter implements SubjectAndIdConverter +{ + private final String topic; + + @JsonCreator + public Avro1124SubjectAndIdConverter(@JsonProperty("topic") String topic) + { + this.topic = topic; + } + + + @Override + public Pair getSubjectAndId(ByteBuffer payload) + { + return new Pair(topic, payload.getInt()); + } + + @Override + public void putSubjectAndId(String subject, Integer id, ByteBuffer payload) + { + payload.putInt(id); + } + + @Override + public Converter getSubjectConverter() + { + return new IdentityConverter(); + } + + @Override + public Converter getIdConverter() + { + return new IntegerConverter(); + } + + @JsonProperty + public String getTopic() + { + return topic; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Avro1124SubjectAndIdConverter converter = (Avro1124SubjectAndIdConverter) o; + + return !(topic != null ? !topic.equals(converter.topic) : converter.topic != null); + + } + + @Override + public int hashCode() + { + return topic != null ? topic.hashCode() : 0; + } +} diff --git a/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/SubjectAndIdConverter.java b/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/SubjectAndIdConverter.java new file mode 100644 index 00000000000..01331dfa25e --- /dev/null +++ b/extensions/avro-extensions/src/main/java/io/druid/data/input/schemarepo/SubjectAndIdConverter.java @@ -0,0 +1,59 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.schemarepo; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.metamx.common.Pair; +import org.schemarepo.api.converter.Converter; + +import java.nio.ByteBuffer; + +/** + * Schema Repository is a registry service, you can register a string schema which gives back an schema id for it, + * or lookup the schema with the schema id. + *

+ * In order to get the "latest" schema or handle compatibility enforcement on changes there has to be some way to group + * a set of schemas together and reason about the ordering of changes over these. Subject is introduced as + * the formal notion of group, defined as an ordered collection of mutually compatible schemas, according to + * Scott Carey on AVRO-1124. + *

+ * So you can register an string schema to a specific subject, get an schema id, and then query the schema using the + * subject and schema id pair. Working with Kafka and Avro, it's intuitive that using Kafka topic as subject name and an + * incrementing integer as schema id, serialize and attach them to the message payload, or extract and deserialize from + * message payload, which is implemented as {@link Avro1124SubjectAndIdConverter}. + *

+ * You can implement your own SubjectAndIdConverter based on your scenario, such as using canonical name of avro schema + * as subject name and incrementing short integer which serialized using varint. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Avro1124SubjectAndIdConverter.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "avro_1124", value = Avro1124SubjectAndIdConverter.class) +}) +public interface SubjectAndIdConverter +{ + + Pair getSubjectAndId(ByteBuffer payload); + + void putSubjectAndId(SUBJECT subject, ID id, ByteBuffer payload); + + Converter getSubjectConverter(); + + Converter getIdConverter(); +} diff --git a/extensions/avro-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/avro-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..050a1dd0727 --- /dev/null +++ b/extensions/avro-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.data.input.avro.AvroExtensionsModule diff --git a/extensions/avro-extensions/src/test/avro/some-datum.avsc b/extensions/avro-extensions/src/test/avro/some-datum.avsc new file mode 100644 index 00000000000..773313ae15a --- /dev/null +++ b/extensions/avro-extensions/src/test/avro/some-datum.avsc @@ -0,0 +1,31 @@ +[{ + "namespace": "io.druid.data.input", + "name": "SomeAvroDatum", + "type": "record", + "fields" : [ + {"name":"timestamp","type":"long"}, + {"name":"eventType","type":"string"}, + {"name":"id","type":"long"}, + {"name":"someOtherId","type":"long"}, + {"name":"isValid","type":"boolean"}, + {"name":"someIntArray","type":{"type":"array","items":"int"}}, + {"name":"someStringArray","type":{"type":"array","items":"string"}}, + {"name":"someIntValueMap","type":{"type":"map","values":"int"}}, + {"name":"someStringValueMap","type":{"type":"map","values":"string"}}, + {"name":"someUnion","type":["null","string"]}, + {"name":"someNull","type":"null"}, + {"name":"someFixed","type":{"type":"fixed","size":16,"name":"MyFixed"}}, + {"name":"someBytes","type":"bytes"}, + {"name":"someEnum","type":{"type":"enum","name":"MyEnum","symbols":["ENUM0","ENUM1","ENUM2"]}}, + {"name":"someRecord","type":{ + "type":"record","name":"MySubRecord","fields":[ + {"name":"subInt","type":"int"}, + {"name":"subLong","type":"long"} + ] + }}, + + {"name":"someLong","type":"long"}, + {"name":"someInt","type":"int"}, + {"name":"someFloat","type":"float"} + ] +}] diff --git a/extensions/avro-extensions/src/test/java/io/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions/avro-extensions/src/test/java/io/druid/data/input/AvroHadoopInputRowParserTest.java new file mode 100644 index 00000000000..59c9a4cdaf8 --- /dev/null +++ b/extensions/avro-extensions/src/test/java/io/druid/data/input/AvroHadoopInputRowParserTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.FileUtils; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static io.druid.data.input.AvroStreamInputRowParserTest.PARSE_SPEC; +import static io.druid.data.input.AvroStreamInputRowParserTest.assertInputRowCorrect; +import static io.druid.data.input.AvroStreamInputRowParserTest.buildSomeAvroDatum; + +public class AvroHadoopInputRowParserTest +{ + private final ObjectMapper jsonMapper = new ObjectMapper(); + + @Test + public void testParseNotFromPigAvroStorage() throws IOException + { + testParse(buildSomeAvroDatum(), false); + } + + @Test + public void testParseFromPiggyBankAvroStorage() throws IOException + { + testParse(buildPiggyBankAvro(), false); + } + + @Test + public void testParseFromPigAvroStorage() throws IOException + { + testParse(buildPigAvro(), true); + } + + private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException + { + AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC, fromPigAvroStorage); + AvroHadoopInputRowParser parser2 = jsonMapper.readValue( + jsonMapper.writeValueAsBytes(parser), + AvroHadoopInputRowParser.class + ); + InputRow inputRow = parser2.parse(record); + assertInputRowCorrect(inputRow); + } + + + public static GenericRecord buildPigAvro() throws IOException + { + return buildPigAvro(buildSomeAvroDatum(), "AvroStorage", "AvroStorage"); + } + + public static GenericRecord buildPiggyBankAvro() throws IOException + { + return buildPigAvro( + buildSomeAvroDatum(), + "org.apache.pig.piggybank.storage.avro.AvroStorage", + "org.apache.pig.piggybank.storage.avro.AvroStorage('field7','{\"type\":\"map\",\"values\":\"int\"}','field8','{\"type\":\"map\",\"values\":\"string\"}')" + ); + } + + private static GenericRecord buildPigAvro(GenericRecord datum, String inputStorage, String outputStorage) + throws IOException + { + final File tmpDir = Files.createTempDir(); + FileReader reader = null; + PigServer pigServer = null; + try { + // 0. write avro object into temp file. + File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro"); + DataFileWriter dataFileWriter = new DataFileWriter( + new GenericDatumWriter() + ); + dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile); + dataFileWriter.append(datum); + dataFileWriter.close(); + // 1. read avro files into Pig + pigServer = new PigServer(ExecType.LOCAL); + pigServer.registerQuery( + String.format( + "A = LOAD '%s' USING %s;", + someAvroDatumFile, + inputStorage + ) + ); + // 2. write new avro file using AvroStorage + File outputDir = new File(tmpDir, "output"); + pigServer.store("A", String.valueOf(outputDir), outputStorage); + // 3. read avro object from AvroStorage + reader = DataFileReader.openReader( + new File(outputDir, "part-m-00000.avro"), + new GenericDatumReader() + ); + return reader.next(); + } + finally { + if (pigServer != null) { + pigServer.shutdown(); + } + Closeables.close(reader, true); + FileUtils.deleteDirectory(tmpDir); + } + } +} diff --git a/extensions/avro-extensions/src/test/java/io/druid/data/input/AvroStreamInputRowParserTest.java b/extensions/avro-extensions/src/test/java/io/druid/data/input/AvroStreamInputRowParserTest.java new file mode 100644 index 00000000000..609d67f6794 --- /dev/null +++ b/extensions/avro-extensions/src/test/java/io/druid/data/input/AvroStreamInputRowParserTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.base.Function; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.data.input.avro.AvroExtensionsModule; +import io.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.TimeAndDimsParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper; +import io.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import org.joda.time.DateTime; +import org.junit.Before; +import org.junit.Test; +import org.schemarepo.InMemoryRepository; +import org.schemarepo.Repository; +import org.schemarepo.SchemaValidationException; +import org.schemarepo.api.TypedSchemaRepository; +import org.schemarepo.api.converter.AvroSchemaConverter; +import org.schemarepo.api.converter.IdentityConverter; +import org.schemarepo.api.converter.IntegerConverter; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class AvroStreamInputRowParserTest +{ + public static final String EVENT_TYPE = "eventType"; + public static final String ID = "id"; + public static final String SOME_OTHER_ID = "someOtherId"; + public static final String IS_VALID = "isValid"; + public static final String TOPIC = "aTopic"; + public static final String EVENT_TYPE_VALUE = "type-a"; + public static final long ID_VALUE = 1976491L; + public static final long SOME_OTHER_ID_VALUE = 6568719896L; + public static final float SOME_FLOAT_VALUE = 0.23555f; + public static final int SOME_INT_VALUE = 1; + public static final long SOME_LONG_VALUE = 679865987569912369L; + public static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30); + public static final List DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID); + public static final TimeAndDimsParseSpec PARSE_SPEC = new TimeAndDimsParseSpec( + new TimestampSpec("timestamp", "millis", null), + new DimensionsSpec(DIMENSIONS, Collections.emptyList(), null) + ); + public static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array()); + private static final long SUB_LONG_VALUE = 1543698L; + private static final int SUB_INT_VALUE = 4892; + public static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder() + .setSubInt(SUB_INT_VALUE) + .setSubLong(SUB_LONG_VALUE) + .build(); + public static final List SOME_STRING_ARRAY_VALUE = Arrays.asList((CharSequence) "8", "4", "2", "1"); + public static final List SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8); + public static final Map SOME_INT_VALUE_MAP_VALUE = Maps.asMap( + new HashSet(Arrays.asList("8", "2", "4", "1")), new Function() + { + @Nullable + @Override + public Integer apply(@Nullable CharSequence input) { return Integer.parseInt(input.toString()); } + } + ); + public static final Map SOME_STRING_VALUE_MAP_VALUE = Maps.asMap( + new HashSet(Arrays.asList("8", "2", "4", "1")), new Function() + { + @Nullable + @Override + public CharSequence apply(@Nullable CharSequence input) { return input.toString(); } + } + ); + public static final String SOME_UNION_VALUE = "string as union"; + public static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8); + private static final Function TO_STRING_INCLUDING_NULL = new Function() + { + public String apply(Object o) { return String.valueOf(o); } + }; + + private final ObjectMapper jsonMapper = new ObjectMapper(); + + + @Before + public void before() + { + jsonMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + for (com.fasterxml.jackson.databind.Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + } + + @Test + public void testSerde() throws IOException + { + Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io"); + AvroStreamInputRowParser parser = new AvroStreamInputRowParser( + PARSE_SPEC, + new SchemaRepoBasedAvroBytesDecoder(new Avro1124SubjectAndIdConverter(TOPIC), repository) + ); + ByteBufferInputRowParser parser2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(parser), + ByteBufferInputRowParser.class + ); + + assertEquals(parser, parser2); + } + + @Test + public void testParse() throws SchemaValidationException, IOException + { + // serde test + Repository repository = new InMemoryRepository(null); + AvroStreamInputRowParser parser = new AvroStreamInputRowParser( + PARSE_SPEC, + new SchemaRepoBasedAvroBytesDecoder(new Avro1124SubjectAndIdConverter(TOPIC), repository) + ); + ByteBufferInputRowParser parser2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(parser), + ByteBufferInputRowParser.class + ); + repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputRowParser) parser2).getAvroBytesDecoder()).getSchemaRepository(); + + // prepare data + GenericRecord someAvroDatum = buildSomeAvroDatum(); + + // encode schema id + Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC); + TypedSchemaRepository repositoryClient = new TypedSchemaRepository( + repository, + new IntegerConverter(), + new AvroSchemaConverter(), + new IdentityConverter() + ); + Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema()); + ByteBuffer byteBuffer = ByteBuffer.allocate(4); + converter.putSubjectAndId(TOPIC, id, byteBuffer); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + out.write(byteBuffer.array()); + // encode data + DatumWriter writer = new GenericDatumWriter(someAvroDatum.getSchema()); + // write avro datum to bytes + writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null)); + + InputRow inputRow = parser2.parse(ByteBuffer.wrap(out.toByteArray())); + + assertInputRowCorrect(inputRow); + } + + public static void assertInputRowCorrect(InputRow inputRow) + { + Collections.sort(DIMENSIONS); + assertEquals(DIMENSIONS, inputRow.getDimensions()); + assertEquals(DATE_TIME.getMillis(), inputRow.getTimestampFromEpoch()); + + // test dimensions + assertEquals(Collections.singletonList(String.valueOf(EVENT_TYPE_VALUE)), inputRow.getDimension(EVENT_TYPE)); + assertEquals(Collections.singletonList(String.valueOf(ID_VALUE)), inputRow.getDimension(ID)); + assertEquals(Collections.singletonList(String.valueOf(SOME_OTHER_ID_VALUE)), inputRow.getDimension(SOME_OTHER_ID)); + assertEquals(Collections.singletonList(String.valueOf(true)), inputRow.getDimension(IS_VALID)); + assertEquals( + Lists.transform(SOME_INT_ARRAY_VALUE, TO_STRING_INCLUDING_NULL), + inputRow.getDimension("someIntArray") + ); + assertEquals( + Lists.transform(SOME_STRING_ARRAY_VALUE, TO_STRING_INCLUDING_NULL), + inputRow.getDimension("someStringArray") + ); + // towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality + assertEquals(1, inputRow.getDimension("someIntValueMap").size()); + assertEquals( + SOME_INT_VALUE_MAP_VALUE, new HashMap( + Maps.transformValues( + Splitter.on(",") + .withKeyValueSeparator("=") + .split(inputRow.getDimension("someIntValueMap").get(0).replaceAll("[\\{\\} ]", "")), + new Function() + { + @Nullable + @Override + public Integer apply(@Nullable String input) + { + return Integer.valueOf(input); + } + } + ) + ) + ); + assertEquals( + SOME_STRING_VALUE_MAP_VALUE, new HashMap( + Splitter.on(",") + .withKeyValueSeparator("=") + .split(inputRow.getDimension("someIntValueMap").get(0).replaceAll("[\\{\\} ]", "")) + ) + ); + assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion")); + assertEquals(Collections.emptyList(), inputRow.getDimension("someNull")); + assertEquals(Collections.singletonList(String.valueOf(SOME_FIXED_VALUE)), inputRow.getDimension("someFixed")); + assertEquals( + Collections.singletonList(Arrays.toString(SOME_BYTES_VALUE.array())), + inputRow.getDimension("someBytes") + ); + assertEquals(Collections.singletonList(String.valueOf(MyEnum.ENUM1)), inputRow.getDimension("someEnum")); + assertEquals(Collections.singletonList(String.valueOf(SOME_RECORD_VALUE)), inputRow.getDimension("someRecord")); + + // test metrics + assertEquals(SOME_FLOAT_VALUE, inputRow.getFloatMetric("someFloat"), 0); + assertEquals(SOME_LONG_VALUE, inputRow.getLongMetric("someLong")); + assertEquals(SOME_INT_VALUE, inputRow.getLongMetric("someInt")); + } + + public static GenericRecord buildSomeAvroDatum() throws IOException + { + SomeAvroDatum datum = SomeAvroDatum.newBuilder() + .setTimestamp(DATE_TIME.getMillis()) + .setEventType(EVENT_TYPE_VALUE) + .setId(ID_VALUE) + .setSomeOtherId(SOME_OTHER_ID_VALUE) + .setIsValid(true) + .setSomeFloat(SOME_FLOAT_VALUE) + .setSomeInt(SOME_INT_VALUE) + .setSomeLong(SOME_LONG_VALUE) + .setSomeIntArray(SOME_INT_ARRAY_VALUE) + .setSomeStringArray(SOME_STRING_ARRAY_VALUE) + .setSomeIntValueMap(SOME_INT_VALUE_MAP_VALUE) + .setSomeStringValueMap(SOME_STRING_VALUE_MAP_VALUE) + .setSomeUnion(SOME_UNION_VALUE) + .setSomeFixed(SOME_FIXED_VALUE) + .setSomeBytes(SOME_BYTES_VALUE) + .setSomeNull(null) + .setSomeEnum(MyEnum.ENUM1) + .setSomeRecord(SOME_RECORD_VALUE) + .build(); + + return datum; + } +} diff --git a/pom.xml b/pom.xml index 3f529747326..d2d1ddfef91 100644 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ extensions/kafka-extraction-namespace extensions/cloudfiles-extensions extensions/datasketches - + extensions/avro-extensions distribution