Add support for Avro OCF using InputFormat (#9671)

* Add AvroOCFInputFormat

* Support supplying a reader schema in AvroOCFInputFormat

* Add docs for Avro OCF input format

* Address review comments

* Address second round of review
This commit is contained in:
Joseph Glanville 2020-05-17 04:09:12 +07:00 committed by GitHub
parent 46beaa0640
commit 793f386d6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 493 additions and 14 deletions

View File

@ -223,6 +223,53 @@ The Parquet `inputFormat` has the following components:
|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
### Avro OCF
> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format.
The `inputFormat` to load data of Avro OCF format. An example is:
```json
"ioConfig": {
"inputFormat": {
"type": "avro_ocf",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "someRecord_subInt",
"expr": "$.someRecord.subInt"
}
]
},
"schema": {
"namespace": "org.apache.druid.data.input",
"name": "SomeDatum",
"type": "record",
"fields" : [
{ "name": "timestamp", "type": "long" },
{ "name": "eventType", "type": "string" },
{ "name": "id", "type": "long" },
{ "name": "someRecord", "type": {
"type": "record", "name": "MySubRecord", "fields": [
{ "name": "subInt", "type": "int"},
{ "name": "subLong", "type": "long"}
]
}}]
},
"binaryAsString": false
},
...
}
```
| Field | Type | Description | Required |
|-------|------|-------------|----------|
|type| String| This should be set to `avro_ocf` to read Avro OCF file| yes |
|flattenSpec| JSON Object |Define a [`flattenSpec`](#flattenspec) to extract nested values from a Avro records. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
|schema| JSON Object |Define a reader schema to be used when parsing Avro records, this is useful when parsing multiple versions of Avro OCF file data | no (default will decode using the writer schema contained in the OCF file) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
### FlattenSpec
The `flattenSpec` is located in `inputFormat``flattenSpec` and is responsible for

View File

@ -181,6 +181,11 @@
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>

View File

@ -54,7 +54,8 @@ public class AvroExtensionsModule implements DruidModule
.registerSubtypes(
new NamedType(AvroStreamInputRowParser.class, "avro_stream"),
new NamedType(AvroHadoopInputRowParser.class, "avro_hadoop"),
new NamedType(AvroParseSpec.class, "avro")
new NamedType(AvroParseSpec.class, "avro"),
new NamedType(AvroOCFInputFormat.class, "avro_ocf")
)
.setMixInAnnotation(Repository.class, RepositoryMixIn.class)
.setMixInAnnotation(JsonUtil.class, JsonUtilMixIn.class)

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Map;
import java.util.Objects;
public class AvroOCFInputFormat extends NestedInputFormat
{
private static final Logger LOGGER = new Logger(AvroOCFInputFormat.class);
private final boolean binaryAsString;
@Nullable
private final Schema readerSchema;
@JsonCreator
public AvroOCFInputFormat(
@JacksonInject @Json ObjectMapper mapper,
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("schema") @Nullable Map<String, Object> schema,
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
) throws Exception
{
super(flattenSpec);
// If a reader schema is supplied create the datum reader with said schema, otherwise use the writer schema
if (schema != null) {
String schemaStr = mapper.writeValueAsString(schema);
LOGGER.debug("Initialising with reader schema: [%s]", schemaStr);
this.readerSchema = new Schema.Parser().parse(schemaStr);
} else {
this.readerSchema = null;
}
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
}
@Override
public boolean isSplittable()
{
// In the future Avro OCF files could be split, the format allows for efficient splitting
// See https://avro.apache.org/docs/current/spec.html#Object+Container+Files for details
return false;
}
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new AvroOCFReader(
inputRowSchema,
source,
temporaryDirectory,
readerSchema,
getFlattenSpec(),
binaryAsString
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
AvroOCFInputFormat that = (AvroOCFInputFormat) o;
return binaryAsString == that.binaryAsString &&
Objects.equals(readerSchema, that.readerSchema);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), binaryAsString, readerSchema);
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class AvroOCFReader extends IntermediateRowParsingReader<GenericRecord>
{
private final InputRowSchema inputRowSchema;
private final InputEntity source;
private final File temporaryDirectory;
private final ObjectFlattener<GenericRecord> recordFlattener;
private Schema readerSchema;
AvroOCFReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
@Nullable Schema readerSchema,
JSONPathSpec flattenSpec,
boolean binaryAsString
)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.readerSchema = readerSchema;
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new AvroFlattenerMaker(false, binaryAsString));
}
@Override
protected CloseableIterator<GenericRecord> intermediateRowIterator() throws IOException
{
final Closer closer = Closer.create();
final byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
try {
final InputEntity.CleanableFile file = closer.register(source.fetch(temporaryDirectory, buffer));
final GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
final DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file.file(), datumReader);
final Schema writerSchema = dataFileReader.getSchema();
if (readerSchema == null) {
readerSchema = writerSchema;
}
datumReader.setSchema(writerSchema);
datumReader.setExpected(readerSchema);
closer.register(dataFileReader);
return new CloseableIterator<GenericRecord>()
{
@Override
public boolean hasNext()
{
return dataFileReader.hasNext();
}
@Override
public GenericRecord next()
{
return dataFileReader.next();
}
@Override
public void close() throws IOException
{
closer.close();
}
};
}
catch (Exception e) {
closer.close();
throw new RuntimeException(e);
}
}
@Override
protected List<InputRow> parseInputRows(GenericRecord intermediateRow) throws ParseException
{
return Collections.singletonList(
MapInputRowParser.parse(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
recordFlattener.flatten(intermediateRow)
)
);
}
@Override
protected Map<String, Object> toMap(GenericRecord intermediateRow)
{
return recordFlattener.toMap(intermediateRow);
}
}

View File

@ -81,16 +81,8 @@ public class AvroHadoopInputRowParserTest
private static GenericRecord buildAvroFromFile(GenericRecord datum)
throws IOException
{
final File tmpDir = FileUtils.createTempDir();
// 0. write avro object into temp file.
File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(
new SpecificDatumWriter<>()
)) {
dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile);
dataFileWriter.append(datum);
}
final File someAvroDatumFile = createAvroFile(datum);
final GenericRecord record;
// 3. read avro object from AvroStorage
@ -103,4 +95,18 @@ public class AvroHadoopInputRowParserTest
return record;
}
public static File createAvroFile(GenericRecord datum)
throws IOException
{
final File tmpDir = FileUtils.createTempDir();
File someAvroDatumFile = new File(tmpDir, "someAvroDatum.avro");
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(
new SpecificDatumWriter<>()
)) {
dataFileWriter.create(SomeAvroDatum.getClassSchema(), someAvroDatumFile);
dataFileWriter.append(datum);
}
return someAvroDatumFile;
}
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.avro.generic.GenericRecord;
import org.apache.druid.data.input.AvroHadoopInputRowParserTest;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class AvroOCFReaderTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testParse() throws Exception
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
);
final InputEntityReader reader = createReader(mapper, null);
assertRow(reader);
}
@Test
public void testParseWithReaderSchema() throws Exception
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
);
// Read the data using a reduced reader schema, emulate using an older version with less fields
String schemaStr = "{\n"
+ " \"namespace\": \"org.apache.druid.data.input\",\n"
+ " \"name\": \"SomeAvroDatum\",\n"
+ " \"type\": \"record\",\n"
+ " \"fields\" : [\n"
+ " {\"name\":\"timestamp\",\"type\":\"long\"},\n"
+ " {\"name\":\"eventType\",\"type\":\"string\"},\n"
+ " {\"name\":\"someLong\",\"type\":\"long\"}\n"
+ " ]\n"
+ "}";
TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>()
{
};
final Map<String, Object> readerSchema = mapper.readValue(schemaStr, typeRef);
final InputEntityReader reader = createReader(mapper, readerSchema);
assertRow(reader);
}
@Test
public void testParseWithReaderSchemaAlias() throws Exception
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
);
// Read the data using a reduced reader schema, emulate using an older version with less fields
String schemaStr = "{\n"
+ " \"namespace\": \"org.apache.druid.data.input\",\n"
+ " \"name\": \"SomeAvroDatum\",\n"
+ " \"type\": \"record\",\n"
+ " \"fields\" : [\n"
+ " {\"name\":\"timestamp\",\"type\":\"long\"},\n"
+ " {\"name\":\"someLong\",\"type\":\"long\"}\n,"
+ " {\"name\":\"eventClass\",\"type\":\"string\", \"aliases\": [\"eventType\"]}\n"
+ " ]\n"
+ "}";
TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>()
{
};
final Map<String, Object> readerSchema = mapper.readValue(schemaStr, typeRef);
final InputEntityReader reader = createReader(mapper, readerSchema);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
// eventType is aliased to eventClass in the reader schema and should be transformed at read time
Assert.assertEquals("type-a", Iterables.getOnlyElement(row.getDimension("eventClass")));
Assert.assertFalse(iterator.hasNext());
}
}
private void assertRow(InputEntityReader reader) throws IOException
{
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
Assert.assertEquals(DateTimes.of("2015-10-25T19:30:00.000Z"), row.getTimestamp());
Assert.assertEquals("type-a", Iterables.getOnlyElement(row.getDimension("eventType")));
Assert.assertEquals(679865987569912369L, row.getMetric("someLong"));
Assert.assertFalse(iterator.hasNext());
}
}
private InputEntityReader createReader(
ObjectMapper mapper,
Map<String, Object> readerSchema
) throws Exception
{
final GenericRecord someAvroDatum = AvroStreamInputRowParserTest.buildSomeAvroDatum();
final File someAvroFile = AvroHadoopInputRowParserTest.createAvroFile(someAvroDatum);
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
final DimensionsSpec dimensionsSpec = new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
"eventType")));
final List<String> metricNames = ImmutableList.of("someLong");
final AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(mapper, null, readerSchema, null);
final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, metricNames);
final FileEntity entity = new FileEntity(someAvroFile);
return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
}
}

View File

@ -311,7 +311,7 @@ const INPUT_FORMAT_FORM_FIELDS: Field<InputFormat>[] = [
name: 'type',
label: 'Input format',
type: 'string',
suggestions: ['json', 'csv', 'tsv', 'regex', 'parquet', 'orc'],
suggestions: ['json', 'csv', 'tsv', 'regex', 'parquet', 'orc', 'avro_ocf'],
info: (
<>
<p>The parser used to parse the data.</p>
@ -384,7 +384,7 @@ const INPUT_FORMAT_FORM_FIELDS: Field<InputFormat>[] = [
name: 'binaryAsString',
type: 'boolean',
defaultValue: false,
defined: (p: InputFormat) => p.type === 'parquet' || p.type === 'orc',
defined: (p: InputFormat) => p.type === 'parquet' || p.type === 'orc' || p.type === 'avro_ocf',
info: (
<>
Specifies if the bytes parquet column which is not logically marked as a string or enum type
@ -415,7 +415,12 @@ export function issueWithInputFormat(inputFormat: InputFormat | undefined): stri
export function inputFormatCanFlatten(inputFormat: InputFormat): boolean {
const inputFormatType = inputFormat.type;
return inputFormatType === 'json' || inputFormatType === 'parquet' || inputFormatType === 'orc';
return (
inputFormatType === 'json' ||
inputFormatType === 'parquet' ||
inputFormatType === 'orc' ||
inputFormatType === 'avro_ocf'
);
}
export interface TimestampSpec {
@ -1088,7 +1093,16 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
label: 'File filter',
type: 'string',
required: true,
suggestions: ['*', '*.json', '*.json.gz', '*.csv', '*.tsv', '*.parquet', '*.orc'],
suggestions: [
'*',
'*.json',
'*.json.gz',
'*.csv',
'*.tsv',
'*.parquet',
'*.orc',
'*.avro',
],
info: (
<>
<ExternalLink
@ -2681,6 +2695,10 @@ function guessInputFormat(sampleData: string[]): InputFormat {
if (sampleDatum.startsWith('ORC')) {
return inputFormatFromType('orc');
}
if (sampleDatum.startsWith('Obj1')) {
return inputFormatFromType('avro_ocf');
}
}
return inputFormatFromType('regex');

View File

@ -133,6 +133,7 @@ MiddleManagers
Montréal
Murmur3
NFS
OCF
OLAP
OOMs
OpenJDK