Merge pull request #1858 from zhaown/avro-module

Support avro ingestion for realtime & hadoop batch indexing
This commit is contained in:
Fangjin Yang 2016-01-04 19:02:10 -08:00
commit d413808e66
19 changed files with 1587 additions and 1 deletions

View File

@ -137,6 +137,7 @@ Is a type of inputSpec where a static path to where the data files are located i
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|paths|Array of String|A String of input paths indicating where the raw data is located.|yes|
|inputFormat|String|The input format of the data files. Default is `org.apache.hadoop.mapreduce.lib.input.TextInputFormat`, or `org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat` if `combineText` in tuningConfig is `true`.|no|
For example, using the static input paths:
@ -154,6 +155,7 @@ Is a type of inputSpec that expects data to be laid out in a specific path forma
|inputPath|String|Base path to append the expected time path to.|yes|
|filePattern|String|Pattern that files should match to be included.|yes|
|pathFormat|String|Joda date-time format for each directory. Default value is `"'y'=yyyy/'m'=MM/'d'=dd/'H'=HH"`, or see [Joda documentation](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)|no|
|inputFormat|String|The input format of the data files. Default is `org.apache.hadoop.mapreduce.lib.input.TextInputFormat`, or `org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat` if `combineText` in tuningConfig is `true`.|no|
For example, if the sample config were run with the interval 2012-06-01/2012-06-02, it would expect data at the paths

View File

@ -91,6 +91,112 @@ If `type` is not included, the parser defaults to `string`.
| type | String | This should say `protobuf`. | no |
| parseSpec | JSON Object | Specifies the format of the data. | yes |
### Avro Stream Parser
This is for realtime ingestion.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_stream`. | no |
| avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes |
| parseSpec | JSON Object | Specifies the format of the data. | yes |
For example, using Avro stream parser with schema repo Avro bytes decoder:
```json
"parser" : {
"type" : "avro_stream",
"avroBytesDecoder" : {
"type" : "schema_repo",
"subjectAndIdConverter" : {
"type" : "avro_1124",
"topic" : "${YOUR_TOPIC}"
},
"schemaRepository" : {
"type" : "avro_1124_rest_client",
"url" : "${YOUR_SCHEMA_REPO_END_POINT}",
}
},
"parsSpec" : {
"format" : "timeAndDims",
"timestampSpec" : {},
"dimensionsSpec" : {}
}
}
```
#### Avro Bytes Decoder
If `type` is not included, the avroBytesDecoder defaults to `schema_repo`.
##### SchemaRepo Based Avro Bytes Decoder
This Avro bytes decoder first extract `subject` and `id` from input message bytes, then use them to lookup the Avro schema with which to decode Avro record from bytes. Details can be found in [schema repo](https://github.com/schema-repo/schema-repo) and [AVRO-1124](https://issues.apache.org/jira/browse/AVRO-1124). You will need an http service like schema repo to hold the avro schema. Towards schema registration on the message producer side, you can refer to `io.druid.data.input.AvroStreamInputRowParserTest#testParse()`.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `schema_repo`. | no |
| subjectAndIdConverter | JSON Object | Specifies the how to extract subject and id from message bytes. | yes |
| schemaRepository | JSON Object | Specifies the how to lookup Avro schema from subject and id. | yes |
##### Avro-1124 Subject And Id Converter
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124`. | no |
| topic | String | Specifies the topic of your kafka stream. | yes |
##### Avro-1124 Schema Repository
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_1124_rest_client`. | no |
| url | String | Specifies the endpoint url of your Avro-1124 schema repository. | yes |
### Avro Hadoop Parser
This is for batch ingestion using the HadoopDruidIndexer. The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"io.druid.data.input.avro.AvroValueInputFormat"`. You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`, eg: `"avro.schema.path.input.value": "/path/to/your/schema.avsc"` or `"avro.schema.input.value": "your_schema_JSON_object"`, if reader's schema is not set, the schema in Avro object container file will be used, see [Avro specification](http://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution).
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | String | This should say `avro_hadoop`. | no |
| parseSpec | JSON Object | Specifies the format of the data. | yes |
| fromPigAvroStorage | Boolean | Specifies whether the data file is stored using AvroStorage. | no(default == false) |
For example, using Avro Hadoop parser with custom reader's schema file:
```json
{
"type" : "index_hadoop",
"hadoopDependencyCoordinates" : ["io.druid.extensions:druid-avro-extensions"],
"spec" : {
"dataSchema" : {
"dataSource" : "",
"parser" : {
"type" : "avro_hadoop",
"parsSpec" : {
"format" : "timeAndDims",
"timestampSpec" : {},
"dimensionsSpec" : {}
}
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"inputFormat": "io.druid.data.input.avro.AvroValueInputFormat",
"paths" : ""
}
},
"tuningConfig" : {
"jobProperties" : {
"avro.schema.path.input.value" : "/path/to/my/schema.avsc",
}
}
}
}
```
### ParseSpec
If `format` is not included, the parseSpec defaults to `tsv`.

View File

@ -0,0 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2015 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-avro-extensions</artifactId>
<name>druid-avro-extensions</name>
<description>druid-avro-extensions</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<properties>
<schemarepo.version>0.1.3</schemarepo.version>
<avro.version>1.7.7</avro.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<classifier>hadoop2</classifier>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>org.schemarepo</groupId>
<artifactId>schema-repo-api</artifactId>
<version>${schemarepo.version}</version>
</dependency>
<dependency>
<groupId>org.schemarepo</groupId>
<artifactId>schema-repo-client</artifactId>
<version>${schemarepo.version}</version>
</dependency>
<dependency>
<groupId>org.schemarepo</groupId>
<artifactId>schema-repo-avro</artifactId>
<version>${schemarepo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.15.0</version>
<classifier>h2</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>piggybank</artifactId>
<version>0.15.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,70 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import org.apache.avro.generic.GenericRecord;
import java.util.List;
public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
private final List<String> dimensions;
private final boolean fromPigAvroStorage;
@JsonCreator
public AvroHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("fromPigAvroStorage") Boolean fromPigAvroStorage
)
{
this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensions();
this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
}
@Override
public InputRow parse(GenericRecord record)
{
return AvroStreamInputRowParser.parseGenericRecord(record, parseSpec, dimensions, fromPigAvroStorage);
}
@JsonProperty
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@JsonProperty
public boolean isFromPigAvroStorage()
{
return fromPigAvroStorage;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage);
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.avro.AvroBytesDecoder;
import io.druid.data.input.avro.GenericRecordAsMap;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.apache.avro.generic.GenericRecord;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
import java.util.List;
public class AvroStreamInputRowParser implements ByteBufferInputRowParser
{
private final ParseSpec parseSpec;
private final List<String> dimensions;
private final AvroBytesDecoder avroBytesDecoder;
@JsonCreator
public AvroStreamInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder
)
{
this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensions();
this.avroBytesDecoder = avroBytesDecoder;
}
@Override
public InputRow parse(ByteBuffer input)
{
return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false);
}
protected static InputRow parseGenericRecord(
GenericRecord record, ParseSpec parseSpec, List<String> dimensions, boolean fromPigAvroStorage
)
{
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage);
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);
}
@JsonProperty
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@JsonProperty
public AvroBytesDecoder getAvroBytesDecoder()
{
return avroBytesDecoder;
}
@Override
public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec)
{
return new AvroStreamInputRowParser(
parseSpec,
avroBytesDecoder
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AvroStreamInputRowParser that = (AvroStreamInputRowParser) o;
if (!parseSpec.equals(that.parseSpec)) {
return false;
}
if (!dimensions.equals(that.dimensions)) {
return false;
}
return avroBytesDecoder.equals(that.avroBytesDecoder);
}
@Override
public int hashCode()
{
int result = parseSpec.hashCode();
result = 31 * result + dimensions.hashCode();
result = 31 * result + avroBytesDecoder.hashCode();
return result;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.avro.generic.GenericRecord;
import java.nio.ByteBuffer;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = SchemaRepoBasedAvroBytesDecoder.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "schema_repo", value = SchemaRepoBasedAvroBytesDecoder.class)
})
public interface AvroBytesDecoder
{
GenericRecord parse(ByteBuffer bytes);
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.data.input.AvroHadoopInputRowParser;
import io.druid.data.input.AvroStreamInputRowParser;
import io.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
import io.druid.initialization.DruidModule;
import org.schemarepo.InMemoryRepository;
import org.schemarepo.Repository;
import org.schemarepo.ValidatorFactory;
import org.schemarepo.json.GsonJsonUtil;
import org.schemarepo.json.JsonUtil;
import java.util.Arrays;
import java.util.List;
public class AvroExtensionsModule implements DruidModule
{
public AvroExtensionsModule() {}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.asList(
new SimpleModule("AvroInputRowParserModule")
.registerSubtypes(
new NamedType(AvroStreamInputRowParser.class, "avro_stream"),
new NamedType(AvroHadoopInputRowParser.class, "avro_hadoop")
)
.setMixInAnnotation(Repository.class, RepositoryMixIn.class)
.setMixInAnnotation(JsonUtil.class, JsonUtilMixIn.class)
.setMixInAnnotation(InMemoryRepository.class, InMemoryRepositoryMixIn.class)
);
}
@Override
public void configure(Binder binder)
{ }
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = GsonJsonUtil.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "gson", value = GsonJsonUtil.class)
})
abstract class JsonUtilMixIn
{
}
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Avro1124RESTRepositoryClientWrapper.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "avro_1124_rest_client", value = Avro1124RESTRepositoryClientWrapper.class),
@JsonSubTypes.Type(name = "in_memory_for_unit_test", value = InMemoryRepository.class)
})
abstract class RepositoryMixIn
{
}
abstract class InMemoryRepositoryMixIn
{
@JsonCreator
public InMemoryRepositoryMixIn(@JsonProperty("validators") ValidatorFactory validators)
{
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.metamx.common.logger.Logger;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class AvroValueInputFormat extends FileInputFormat<NullWritable, GenericRecord>
{
private static final Logger log = new Logger(AvroValueInputFormat.class);
private static final String CONF_INPUT_VALUE_SCHEMA_PATH = "avro.schema.input.value.path";
/**
* {@inheritDoc}
*/
@Override
public RecordReader<NullWritable, GenericRecord> createRecordReader(
InputSplit split, TaskAttemptContext context
) throws IOException, InterruptedException
{
Schema readerSchema = AvroJob.getInputValueSchema(context.getConfiguration());
if (readerSchema == null) {
String schemaFilePath = context.getConfiguration().get(CONF_INPUT_VALUE_SCHEMA_PATH);
if (StringUtils.isNotBlank(schemaFilePath)) {
log.info("Using file: %s as reader schema.", schemaFilePath);
FSDataInputStream inputStream = FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath));
try {
readerSchema = new Schema.Parser().parse(inputStream);
}
finally {
inputStream.close();
}
}
}
if (null == readerSchema) {
log.warn("Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.");
log.info("Using a reader schema equal to the writer schema.");
}
return new AvroValueRecordReader(readerSchema);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroRecordReaderBase;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class AvroValueRecordReader extends AvroRecordReaderBase<NullWritable, GenericRecord, GenericRecord>
{
public AvroValueRecordReader(Schema readerSchema)
{
super(readerSchema);
}
/**
* {@inheritDoc}
*/
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException
{
return NullWritable.get();
}
/**
* {@inheritDoc}
*/
@Override
public GenericRecord getCurrentValue() throws IOException, InterruptedException
{
return getCurrentRecord();
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class GenericRecordAsMap implements Map<String, Object>
{
private final GenericRecord record;
private final boolean fromPigAvroStorage;
private static final Function<Object, String> PIG_AVRO_STORAGE_ARRAY_TO_STRING_INCLUDING_NULL = new Function<Object, String>()
{
@Nullable
@Override
public String apply(Object input)
{
return String.valueOf(((GenericRecord) input).get(0));
}
};
public GenericRecordAsMap(GenericRecord record, boolean fromPigAvroStorage)
{
this.record = record;
this.fromPigAvroStorage = fromPigAvroStorage;
}
@Override
public int size()
{
throw new UnsupportedOperationException();
}
@Override
public boolean isEmpty()
{
throw new UnsupportedOperationException();
}
@Override
public boolean containsKey(Object key)
{
throw new UnsupportedOperationException();
}
@Override
public boolean containsValue(Object value)
{
throw new UnsupportedOperationException();
}
/**
* When used in MapBasedRow, field in GenericRecord will be interpret as follows:
* <ul>
* <li> avro schema type -> druid dimension:</li>
* <ul>
* <li>null, boolean, int, long, float, double, string, Records, Enums, Maps, Fixed -> String, using String.valueOf</li>
* <li>bytes -> Arrays.toString() </li>
* <li>Arrays -> List&lt;String&gt;, using Lists.transform(&lt;List&gt;dimValue, TO_STRING_INCLUDING_NULL)</li>
* </ul>
* <li> avro schema type -> druid metric:</li>
* <ul>
* <li>null -> 0F/0L</li>
* <li>int, long, float, double -> Float/Long, using Number.floatValue()/Number.longValue()</li>
* <li>string -> Float/Long, using Float.valueOf()/Long.valueOf()</li>
* <li>boolean, bytes, Arrays, Records, Enums, Maps, Fixed -> ParseException</li>
* </ul>
* </ul>
*/
@Override
public Object get(Object key)
{
Object field = record.get(key.toString());
if (fromPigAvroStorage && field instanceof GenericData.Array) {
return Lists.transform((List) field, PIG_AVRO_STORAGE_ARRAY_TO_STRING_INCLUDING_NULL);
}
if (field instanceof ByteBuffer) {
return Arrays.toString(((ByteBuffer) field).array());
}
return field;
}
@Override
public Object put(String key, Object value)
{
throw new UnsupportedOperationException();
}
@Override
public Object remove(Object key)
{
throw new UnsupportedOperationException();
}
@Override
public void putAll(Map<? extends String, ?> m)
{
throw new UnsupportedOperationException();
}
@Override
public void clear()
{
throw new UnsupportedOperationException();
}
@Override
public Set<String> keySet()
{
throw new UnsupportedOperationException();
}
@Override
public Collection<Object> values()
{
throw new UnsupportedOperationException();
}
@Override
public Set<Entry<String, Object>> entrySet()
{
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.Pair;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.schemarepo.SubjectAndIdConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.util.ByteBufferInputStream;
import org.schemarepo.Repository;
import org.schemarepo.api.TypedSchemaRepository;
import org.schemarepo.api.converter.AvroSchemaConverter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDecoder
{
private final TypedSchemaRepository<ID, Schema, SUBJECT> typedRepository;
private final SubjectAndIdConverter<SUBJECT, ID> subjectAndIdConverter;
private final Repository schemaRepository;
@JsonCreator
public SchemaRepoBasedAvroBytesDecoder(
@JsonProperty("subjectAndIdConverter") SubjectAndIdConverter<SUBJECT, ID> subjectAndIdConverter,
@JsonProperty("schemaRepository") Repository schemaRepository
)
{
this.subjectAndIdConverter = subjectAndIdConverter;
this.schemaRepository = schemaRepository;
typedRepository = new TypedSchemaRepository<ID, Schema, SUBJECT>(
schemaRepository,
subjectAndIdConverter.getIdConverter(),
new AvroSchemaConverter(false),
subjectAndIdConverter.getSubjectConverter()
);
}
@JsonProperty
public Repository getSchemaRepository()
{
return schemaRepository;
}
@JsonProperty
public SubjectAndIdConverter<SUBJECT, ID> getSubjectAndIdConverter()
{
return subjectAndIdConverter;
}
@Override
public GenericRecord parse(ByteBuffer bytes)
{
Pair<SUBJECT, ID> subjectAndId = subjectAndIdConverter.getSubjectAndId(bytes);
Schema schema = typedRepository.getSchema(subjectAndId.lhs, subjectAndId.rhs);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
try {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
}
catch (IOException e) {
throw new ParseException(e, "Fail to decode avro message!");
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SchemaRepoBasedAvroBytesDecoder<?, ?> that = (SchemaRepoBasedAvroBytesDecoder<?, ?>) o;
if (subjectAndIdConverter != null
? !subjectAndIdConverter.equals(that.subjectAndIdConverter)
: that.subjectAndIdConverter != null) {
return false;
}
return !(schemaRepository != null
? !schemaRepository.equals(that.schemaRepository)
: that.schemaRepository != null);
}
@Override
public int hashCode()
{
int result = subjectAndIdConverter != null ? subjectAndIdConverter.hashCode() : 0;
result = 31 * result + (schemaRepository != null ? schemaRepository.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.schemarepo;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.schemarepo.client.Avro1124RESTRepositoryClient;
public class Avro1124RESTRepositoryClientWrapper extends Avro1124RESTRepositoryClient
{
private final String url;
public Avro1124RESTRepositoryClientWrapper(
@JsonProperty("url") String url
)
{
super(url);
this.url = url;
}
@JsonIgnore
@Override
public String getStatus()
{
return super.getStatus();
}
@JsonProperty
public String getUrl()
{
return url;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Avro1124RESTRepositoryClientWrapper that = (Avro1124RESTRepositoryClientWrapper) o;
return !(url != null ? !url.equals(that.url) : that.url != null);
}
@Override
public int hashCode()
{
return url != null ? url.hashCode() : 0;
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.schemarepo;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.Pair;
import org.schemarepo.api.converter.Converter;
import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter;
import java.nio.ByteBuffer;
/**
* This implementation using injected Kafka topic name as subject name, and an integer as schema id. Before sending avro
* message to Kafka broker, you need to register the schema to an schema repository, get the schema id, serialized it to
* 4 bytes and then insert them to the head of the payload. In the reading end, you extract 4 bytes from raw messages,
* deserialize and return it with the topic name, with which you can lookup the avro schema.
*
* @see SubjectAndIdConverter
*/
public class Avro1124SubjectAndIdConverter implements SubjectAndIdConverter<String, Integer>
{
private final String topic;
@JsonCreator
public Avro1124SubjectAndIdConverter(@JsonProperty("topic") String topic)
{
this.topic = topic;
}
@Override
public Pair<String, Integer> getSubjectAndId(ByteBuffer payload)
{
return new Pair<String, Integer>(topic, payload.getInt());
}
@Override
public void putSubjectAndId(String subject, Integer id, ByteBuffer payload)
{
payload.putInt(id);
}
@Override
public Converter<String> getSubjectConverter()
{
return new IdentityConverter();
}
@Override
public Converter<Integer> getIdConverter()
{
return new IntegerConverter();
}
@JsonProperty
public String getTopic()
{
return topic;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Avro1124SubjectAndIdConverter converter = (Avro1124SubjectAndIdConverter) o;
return !(topic != null ? !topic.equals(converter.topic) : converter.topic != null);
}
@Override
public int hashCode()
{
return topic != null ? topic.hashCode() : 0;
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.schemarepo;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.Pair;
import org.schemarepo.api.converter.Converter;
import java.nio.ByteBuffer;
/**
* Schema Repository is a registry service, you can register a string schema which gives back an schema id for it,
* or lookup the schema with the schema id.
* <p>
* In order to get the "latest" schema or handle compatibility enforcement on changes there has to be some way to group
* a set of schemas together and reason about the ordering of changes over these. <i>Subject</i> is introduced as
* the formal notion of <i>group</i>, defined as an ordered collection of mutually compatible schemas, according to <a href="https://issues.apache.org/jira/browse/AVRO-1124?focusedCommentId=13503967&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13503967">
* Scott Carey on AVRO-1124</a>.
* <p>
* So you can register an string schema to a specific subject, get an schema id, and then query the schema using the
* subject and schema id pair. Working with Kafka and Avro, it's intuitive that using Kafka topic as subject name and an
* incrementing integer as schema id, serialize and attach them to the message payload, or extract and deserialize from
* message payload, which is implemented as {@link Avro1124SubjectAndIdConverter}.
* <p>
* You can implement your own SubjectAndIdConverter based on your scenario, such as using canonical name of avro schema
* as subject name and incrementing short integer which serialized using varint.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Avro1124SubjectAndIdConverter.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "avro_1124", value = Avro1124SubjectAndIdConverter.class)
})
public interface SubjectAndIdConverter<SUBJECT, ID>
{
Pair<SUBJECT, ID> getSubjectAndId(ByteBuffer payload);
void putSubjectAndId(SUBJECT subject, ID id, ByteBuffer payload);
Converter<SUBJECT> getSubjectConverter();
Converter<ID> getIdConverter();
}

View File

@ -0,0 +1 @@
io.druid.data.input.avro.AvroExtensionsModule

View File

@ -0,0 +1,31 @@
[{
"namespace": "io.druid.data.input",
"name": "SomeAvroDatum",
"type": "record",
"fields" : [
{"name":"timestamp","type":"long"},
{"name":"eventType","type":"string"},
{"name":"id","type":"long"},
{"name":"someOtherId","type":"long"},
{"name":"isValid","type":"boolean"},
{"name":"someIntArray","type":{"type":"array","items":"int"}},
{"name":"someStringArray","type":{"type":"array","items":"string"}},
{"name":"someIntValueMap","type":{"type":"map","values":"int"}},
{"name":"someStringValueMap","type":{"type":"map","values":"string"}},
{"name":"someUnion","type":["null","string"]},
{"name":"someNull","type":"null"},
{"name":"someFixed","type":{"type":"fixed","size":16,"name":"MyFixed"}},
{"name":"someBytes","type":"bytes"},
{"name":"someEnum","type":{"type":"enum","name":"MyEnum","symbols":["ENUM0","ENUM1","ENUM2"]}},
{"name":"someRecord","type":{
"type":"record","name":"MySubRecord","fields":[
{"name":"subInt","type":"int"},
{"name":"subLong","type":"long"}
]
}},
{"name":"someLong","type":"long"},
{"name":"someInt","type":"int"},
{"name":"someFloat","type":"float"}
]
}]

View File

@ -0,0 +1,132 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.FileUtils;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static io.druid.data.input.AvroStreamInputRowParserTest.PARSE_SPEC;
import static io.druid.data.input.AvroStreamInputRowParserTest.assertInputRowCorrect;
import static io.druid.data.input.AvroStreamInputRowParserTest.buildSomeAvroDatum;
public class AvroHadoopInputRowParserTest
{
private final ObjectMapper jsonMapper = new ObjectMapper();
@Test
public void testParseNotFromPigAvroStorage() throws IOException
{
testParse(buildSomeAvroDatum(), false);
}
@Test
public void testParseFromPiggyBankAvroStorage() throws IOException
{
testParse(buildPiggyBankAvro(), false);
}
@Test
public void testParseFromPigAvroStorage() throws IOException
{
testParse(buildPigAvro(), true);
}
private void testParse(GenericRecord record, boolean fromPigAvroStorage) throws IOException
{
AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(PARSE_SPEC, fromPigAvroStorage);
AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
);
InputRow inputRow = parser2.parse(record);
assertInputRowCorrect(inputRow);
}
public static GenericRecord buildPigAvro() throws IOException
{
return buildPigAvro(buildSomeAvroDatum(), "AvroStorage", "AvroStorage");
}
public static GenericRecord buildPiggyBankAvro() throws IOException
{
return buildPigAvro(
buildSomeAvroDatum(),
"org.apache.pig.piggybank.storage.avro.AvroStorage",
"org.apache.pig.piggybank.storage.avro.AvroStorage('field7','{\"type\":\"map\",\"values\":\"int\"}','field8','{\"type\":\"map\",\"values\":\"string\"}')"
);
}
private static GenericRecord buildPigAvro(GenericRecord datum, String inputStorage, String outputStorage)
throws IOException
{
final File tmpDir = Files.createTempDir();
FileReader<GenericRecord> reader = null;
PigServer pigServer = null;
try {
// 0. write avro object into temp file.
File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(
new GenericDatumWriter<GenericRecord>()
);
dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile);
dataFileWriter.append(datum);
dataFileWriter.close();
// 1. read avro files into Pig
pigServer = new PigServer(ExecType.LOCAL);
pigServer.registerQuery(
String.format(
"A = LOAD '%s' USING %s;",
someAvroDatumFile,
inputStorage
)
);
// 2. write new avro file using AvroStorage
File outputDir = new File(tmpDir, "output");
pigServer.store("A", String.valueOf(outputDir), outputStorage);
// 3. read avro object from AvroStorage
reader = DataFileReader.openReader(
new File(outputDir, "part-m-00000.avro"),
new GenericDatumReader<GenericRecord>()
);
return reader.next();
}
finally {
if (pigServer != null) {
pigServer.shutdown();
}
Closeables.close(reader, true);
FileUtils.deleteDirectory(tmpDir);
}
}
}

View File

@ -0,0 +1,273 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.data.input.avro.AvroExtensionsModule;
import io.druid.data.input.avro.SchemaRepoBasedAvroBytesDecoder;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
import io.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
import org.schemarepo.InMemoryRepository;
import org.schemarepo.Repository;
import org.schemarepo.SchemaValidationException;
import org.schemarepo.api.TypedSchemaRepository;
import org.schemarepo.api.converter.AvroSchemaConverter;
import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class AvroStreamInputRowParserTest
{
public static final String EVENT_TYPE = "eventType";
public static final String ID = "id";
public static final String SOME_OTHER_ID = "someOtherId";
public static final String IS_VALID = "isValid";
public static final String TOPIC = "aTopic";
public static final String EVENT_TYPE_VALUE = "type-a";
public static final long ID_VALUE = 1976491L;
public static final long SOME_OTHER_ID_VALUE = 6568719896L;
public static final float SOME_FLOAT_VALUE = 0.23555f;
public static final int SOME_INT_VALUE = 1;
public static final long SOME_LONG_VALUE = 679865987569912369L;
public static final DateTime DATE_TIME = new DateTime(2015, 10, 25, 19, 30);
public static final List<String> DIMENSIONS = Arrays.asList(EVENT_TYPE, ID, SOME_OTHER_ID, IS_VALID);
public static final TimeAndDimsParseSpec PARSE_SPEC = new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "millis", null),
new DimensionsSpec(DIMENSIONS, Collections.<String>emptyList(), null)
);
public static final MyFixed SOME_FIXED_VALUE = new MyFixed(ByteBuffer.allocate(16).array());
private static final long SUB_LONG_VALUE = 1543698L;
private static final int SUB_INT_VALUE = 4892;
public static final MySubRecord SOME_RECORD_VALUE = MySubRecord.newBuilder()
.setSubInt(SUB_INT_VALUE)
.setSubLong(SUB_LONG_VALUE)
.build();
public static final List<CharSequence> SOME_STRING_ARRAY_VALUE = Arrays.asList((CharSequence) "8", "4", "2", "1");
public static final List<Integer> SOME_INT_ARRAY_VALUE = Arrays.asList(1, 2, 4, 8);
public static final Map<CharSequence, Integer> SOME_INT_VALUE_MAP_VALUE = Maps.asMap(
new HashSet<CharSequence>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, Integer>()
{
@Nullable
@Override
public Integer apply(@Nullable CharSequence input) { return Integer.parseInt(input.toString()); }
}
);
public static final Map<CharSequence, CharSequence> SOME_STRING_VALUE_MAP_VALUE = Maps.asMap(
new HashSet<CharSequence>(Arrays.asList("8", "2", "4", "1")), new Function<CharSequence, CharSequence>()
{
@Nullable
@Override
public CharSequence apply(@Nullable CharSequence input) { return input.toString(); }
}
);
public static final String SOME_UNION_VALUE = "string as union";
public static final ByteBuffer SOME_BYTES_VALUE = ByteBuffer.allocate(8);
private static final Function<Object, String> TO_STRING_INCLUDING_NULL = new Function<Object, String>()
{
public String apply(Object o) { return String.valueOf(o); }
};
private final ObjectMapper jsonMapper = new ObjectMapper();
@Before
public void before()
{
jsonMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
for (com.fasterxml.jackson.databind.Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
}
@Test
public void testSerde() throws IOException
{
Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC,
new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
ByteBufferInputRowParser.class
);
assertEquals(parser, parser2);
}
@Test
public void testParse() throws SchemaValidationException, IOException
{
// serde test
Repository repository = new InMemoryRepository(null);
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC,
new SchemaRepoBasedAvroBytesDecoder<String, Integer>(new Avro1124SubjectAndIdConverter(TOPIC), repository)
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
ByteBufferInputRowParser.class
);
repository = ((SchemaRepoBasedAvroBytesDecoder) ((AvroStreamInputRowParser) parser2).getAvroBytesDecoder()).getSchemaRepository();
// prepare data
GenericRecord someAvroDatum = buildSomeAvroDatum();
// encode schema id
Avro1124SubjectAndIdConverter converter = new Avro1124SubjectAndIdConverter(TOPIC);
TypedSchemaRepository<Integer, Schema, String> repositoryClient = new TypedSchemaRepository<Integer, Schema, String>(
repository,
new IntegerConverter(),
new AvroSchemaConverter(),
new IdentityConverter()
);
Integer id = repositoryClient.registerSchema(TOPIC, SomeAvroDatum.getClassSchema());
ByteBuffer byteBuffer = ByteBuffer.allocate(4);
converter.putSubjectAndId(TOPIC, id, byteBuffer);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(byteBuffer.array());
// encode data
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(someAvroDatum.getSchema());
// write avro datum to bytes
writer.write(someAvroDatum, EncoderFactory.get().directBinaryEncoder(out, null));
InputRow inputRow = parser2.parse(ByteBuffer.wrap(out.toByteArray()));
assertInputRowCorrect(inputRow);
}
public static void assertInputRowCorrect(InputRow inputRow)
{
Collections.sort(DIMENSIONS);
assertEquals(DIMENSIONS, inputRow.getDimensions());
assertEquals(DATE_TIME.getMillis(), inputRow.getTimestampFromEpoch());
// test dimensions
assertEquals(Collections.singletonList(String.valueOf(EVENT_TYPE_VALUE)), inputRow.getDimension(EVENT_TYPE));
assertEquals(Collections.singletonList(String.valueOf(ID_VALUE)), inputRow.getDimension(ID));
assertEquals(Collections.singletonList(String.valueOf(SOME_OTHER_ID_VALUE)), inputRow.getDimension(SOME_OTHER_ID));
assertEquals(Collections.singletonList(String.valueOf(true)), inputRow.getDimension(IS_VALID));
assertEquals(
Lists.transform(SOME_INT_ARRAY_VALUE, TO_STRING_INCLUDING_NULL),
inputRow.getDimension("someIntArray")
);
assertEquals(
Lists.transform(SOME_STRING_ARRAY_VALUE, TO_STRING_INCLUDING_NULL),
inputRow.getDimension("someStringArray")
);
// towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality
assertEquals(1, inputRow.getDimension("someIntValueMap").size());
assertEquals(
SOME_INT_VALUE_MAP_VALUE, new HashMap<CharSequence, Integer>(
Maps.transformValues(
Splitter.on(",")
.withKeyValueSeparator("=")
.split(inputRow.getDimension("someIntValueMap").get(0).replaceAll("[\\{\\} ]", "")),
new Function<String, Integer>()
{
@Nullable
@Override
public Integer apply(@Nullable String input)
{
return Integer.valueOf(input);
}
}
)
)
);
assertEquals(
SOME_STRING_VALUE_MAP_VALUE, new HashMap<CharSequence, CharSequence>(
Splitter.on(",")
.withKeyValueSeparator("=")
.split(inputRow.getDimension("someIntValueMap").get(0).replaceAll("[\\{\\} ]", ""))
)
);
assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion"));
assertEquals(Collections.emptyList(), inputRow.getDimension("someNull"));
assertEquals(Collections.singletonList(String.valueOf(SOME_FIXED_VALUE)), inputRow.getDimension("someFixed"));
assertEquals(
Collections.singletonList(Arrays.toString(SOME_BYTES_VALUE.array())),
inputRow.getDimension("someBytes")
);
assertEquals(Collections.singletonList(String.valueOf(MyEnum.ENUM1)), inputRow.getDimension("someEnum"));
assertEquals(Collections.singletonList(String.valueOf(SOME_RECORD_VALUE)), inputRow.getDimension("someRecord"));
// test metrics
assertEquals(SOME_FLOAT_VALUE, inputRow.getFloatMetric("someFloat"), 0);
assertEquals(SOME_LONG_VALUE, inputRow.getLongMetric("someLong"));
assertEquals(SOME_INT_VALUE, inputRow.getLongMetric("someInt"));
}
public static GenericRecord buildSomeAvroDatum() throws IOException
{
SomeAvroDatum datum = SomeAvroDatum.newBuilder()
.setTimestamp(DATE_TIME.getMillis())
.setEventType(EVENT_TYPE_VALUE)
.setId(ID_VALUE)
.setSomeOtherId(SOME_OTHER_ID_VALUE)
.setIsValid(true)
.setSomeFloat(SOME_FLOAT_VALUE)
.setSomeInt(SOME_INT_VALUE)
.setSomeLong(SOME_LONG_VALUE)
.setSomeIntArray(SOME_INT_ARRAY_VALUE)
.setSomeStringArray(SOME_STRING_ARRAY_VALUE)
.setSomeIntValueMap(SOME_INT_VALUE_MAP_VALUE)
.setSomeStringValueMap(SOME_STRING_VALUE_MAP_VALUE)
.setSomeUnion(SOME_UNION_VALUE)
.setSomeFixed(SOME_FIXED_VALUE)
.setSomeBytes(SOME_BYTES_VALUE)
.setSomeNull(null)
.setSomeEnum(MyEnum.ENUM1)
.setSomeRecord(SOME_RECORD_VALUE)
.build();
return datum;
}
}

View File

@ -103,7 +103,7 @@
<module>extensions/kafka-extraction-namespace</module>
<module>extensions/cloudfiles-extensions</module>
<module>extensions/datasketches</module>
<module>extensions/avro-extensions</module>
<!-- distribution packaging -->
<module>distribution</module>