Fix Avro support in Web Console (#10232)

* Fix Avro OCF detection prefix and run formation detection on raw input

* Support Avro Fixed and Enum types correctly

* Check Avro version byte in format detection

* Add test for AvroOCFReader.sample

Ensures that the Sampler doesn't receive raw input that it can't
serialize into JSON.

* Document Avro type handling

* Add TS unit tests for guessInputFormat
This commit is contained in:
Joseph Glanville 2020-10-08 11:08:22 +07:00 committed by GitHub
parent 2e50ada407
commit 7ce9ac4548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 153 additions and 25 deletions

View File

@ -29,4 +29,23 @@ two Avro Parsers for stream ingestion and Hadoop batch ingestion.
See [Avro Hadoop Parser](../../ingestion/data-formats.md#avro-hadoop-parser) and [Avro Stream Parser](../../ingestion/data-formats.md#avro-stream-parser)
for more details about how to use these in an ingestion spec.
Make sure to [include](../../development/extensions.md#loading-extensions) `druid-avro-extensions` as an extension.
Additionally, it provides an InputFormat for reading Avro OCF files when using
[native batch indexing](../../ingestion/native-batch.md), see [Avro OCF](../../ingestion/data-formats.md#avro-ocf)
for details on how to ingest OCF files.
Make sure to [include](../../development/extensions.md#loading-extensions) `druid-avro-extensions` as an extension.
### Avro Types
Druid supports most Avro types natively, there are however some exceptions which are detailed here.
`union` types which aren't of the form `[null, otherType]` aren't supported at this time.
`bytes` and `fixed` Avro types will be returned by default as base64 encoded strings unless the `binaryAsString` option is enabled on the Avro parser.
This setting will decode these types as UTF-8 strings.
`enum` types will be returned as `string` of the enum symbol.
`record` and `map` types representing nested data can be ingested using [flattenSpec](../../ingestion/data-formats.md#flattenspec) on the parser.
Druid doesn't currently support Avro logical types, they will be ignored and fields will be handled according to the underlying primitive type.

View File

@ -227,6 +227,8 @@ The Parquet `inputFormat` has the following components:
> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro OCF input format.
> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
The `inputFormat` to load data of Avro OCF format. An example is:
```json
"ioConfig": {
@ -343,6 +345,8 @@ Each line can be further parsed using [`parseSpec`](#parsespec).
> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Hadoop Parser.
> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
This parser is for [Hadoop batch ingestion](./hadoop.md).
The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.druid.data.input.avro.AvroValueInputFormat"`.
You may want to set Avro reader's schema in `jobProperties` in `tuningConfig`,
@ -865,6 +869,8 @@ an explicitly defined [format](http://www.joda.org/joda-time/apidocs/org/joda/ti
> You need to include the [`druid-avro-extensions`](../development/extensions-core/avro.md) as an extension to use the Avro Stream Parser.
> See the [Avro Types](../development/extensions-core/avro.md#avro-types) section for how Avro types are handled in Druid
This parser is for [stream ingestion](./index.md#streaming) and reads Avro data from a stream directly.
| Field | Type | Description | Required |

View File

@ -25,7 +25,9 @@ import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.java.util.common.StringUtils;
@ -56,7 +58,9 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
Schema.Type.INT,
Schema.Type.LONG,
Schema.Type.FLOAT,
Schema.Type.DOUBLE
Schema.Type.DOUBLE,
Schema.Type.ENUM,
Schema.Type.FIXED
);
private static boolean isPrimitive(Schema schema)
@ -137,9 +141,15 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
return AVRO_JSON_PROVIDER;
}
@Override
public Object finalizeConversionForMap(Object o)
{
return transformValue(o);
}
private Object transformValue(final Object field)
{
if (fromPigAvroStorage && field instanceof GenericData.Array) {
if (fromPigAvroStorage && field instanceof GenericArray) {
return Lists.transform((List) field, item -> String.valueOf(((GenericRecord) item).get(0)));
}
if (field instanceof ByteBuffer) {
@ -152,6 +162,14 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
return field.toString();
} else if (field instanceof List) {
return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
} else if (field instanceof GenericEnumSymbol) {
return field.toString();
} else if (field instanceof GenericFixed) {
if (binaryAsString) {
return StringUtils.fromUtf8(((GenericFixed) field).bytes());
} else {
return ((GenericFixed) field).bytes();
}
}
return field;
}

View File

@ -19,10 +19,8 @@
package org.apache.druid.data.input;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
@ -40,6 +38,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.schemarepo.Avro1124RESTRepositoryClientWrapper;
import org.apache.druid.data.input.schemarepo.Avro1124SubjectAndIdConverter;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -90,10 +89,12 @@ public class AvroStreamInputRowParserTest
"someStringArray",
"someIntArray",
"someFloat",
"someUnion",
EVENT_TYPE,
ID,
"someFixed",
"someBytes",
"someUnion",
ID,
"someEnum",
"someLong",
"someInt",
"timestamp"
@ -158,14 +159,12 @@ public class AvroStreamInputRowParserTest
.build());
private static final Pattern BRACES_AND_SPACE = Pattern.compile("[{} ]");
private final ObjectMapper jsonMapper = new ObjectMapper();
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Before
public void before()
{
jsonMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
jsonMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
@ -335,7 +334,10 @@ public class AvroStreamInputRowParserTest
);
Assert.assertEquals(Collections.singletonList(SOME_UNION_VALUE), inputRow.getDimension("someUnion"));
Assert.assertEquals(Collections.emptyList(), inputRow.getDimension("someNull"));
Assert.assertEquals(SOME_FIXED_VALUE, inputRow.getRaw("someFixed"));
Assert.assertEquals(
Arrays.toString(SOME_FIXED_VALUE.bytes()),
Arrays.toString((byte[]) (inputRow.getRaw("someFixed")))
);
Assert.assertEquals(
Arrays.toString(SOME_BYTES_VALUE.array()),
Arrays.toString((byte[]) (inputRow.getRaw("someBytes")))

View File

@ -80,7 +80,8 @@ public class AvroFlattenerMakerTest
flattener.getRootField(record, "someNull")
);
Assert.assertEquals(
record.getSomeFixed(),
// Casted to an array by transformValue
record.getSomeFixed().bytes(),
flattener.getRootField(record, "someFixed")
);
Assert.assertEquals(
@ -89,7 +90,8 @@ public class AvroFlattenerMakerTest
flattener.getRootField(record, "someBytes")
);
Assert.assertEquals(
record.getSomeEnum(),
// Casted to a string by transformValue
record.getSomeEnum().toString(),
flattener.getRootField(record, "someEnum")
);
Assert.assertEquals(
@ -165,7 +167,8 @@ public class AvroFlattenerMakerTest
flattener.makeJsonPathExtractor("$.someNull").apply(record)
);
Assert.assertEquals(
record.getSomeFixed(),
// Casted to an array by transformValue
record.getSomeFixed().bytes(),
flattener.makeJsonPathExtractor("$.someFixed").apply(record)
);
Assert.assertEquals(
@ -174,7 +177,8 @@ public class AvroFlattenerMakerTest
flattener.makeJsonPathExtractor("$.someBytes").apply(record)
);
Assert.assertEquals(
record.getSomeEnum(),
// Casted to a string by transformValue
record.getSomeEnum().toString(),
flattener.makeJsonPathExtractor("$.someEnum").apply(record)
);
Assert.assertEquals(

View File

@ -29,6 +29,7 @@ import org.apache.druid.data.input.AvroHadoopInputRowParserTest;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FileEntity;
@ -128,18 +129,66 @@ public class AvroOCFReaderTest
}
}
@Test
public void testSample() throws Exception
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
);
final InputEntityReader reader = createReader(mapper, null);
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
Assert.assertTrue(iterator.hasNext());
final InputRowListPlusRawValues row = iterator.next();
Assert.assertFalse(iterator.hasNext());
final Map<String, Object> rawColumns = row.getRawValues();
Assert.assertNotNull(rawColumns);
Assert.assertEquals(19, rawColumns.size());
final List<InputRow> inputRows = row.getInputRows();
Assert.assertNotNull(inputRows);
final InputRow inputRow = Iterables.getOnlyElement(inputRows);
assertInputRow(inputRow);
}
}
@Test
public void testSampleSerdeRaw() throws Exception
{
final ObjectMapper mapper = new DefaultObjectMapper();
mapper.setInjectableValues(
new InjectableValues.Std().addValue(ObjectMapper.class, mapper)
);
final InputEntityReader reader = createReader(mapper, null);
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
Assert.assertTrue(iterator.hasNext());
final InputRowListPlusRawValues row = iterator.next();
Assert.assertFalse(iterator.hasNext());
final List<InputRow> inputRows = row.getInputRows();
Assert.assertNotNull(inputRows);
final InputRow inputRow = Iterables.getOnlyElement(inputRows);
assertInputRow(inputRow);
// Ensure the raw values can be serialised into JSON
mapper.writeValueAsString(row.getRawValues());
}
}
private void assertRow(InputEntityReader reader) throws IOException
{
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
Assert.assertEquals(DateTimes.of("2015-10-25T19:30:00.000Z"), row.getTimestamp());
Assert.assertEquals("type-a", Iterables.getOnlyElement(row.getDimension("eventType")));
Assert.assertEquals(679865987569912369L, row.getMetric("someLong"));
assertInputRow(row);
Assert.assertFalse(iterator.hasNext());
}
}
private void assertInputRow(InputRow row)
{
Assert.assertEquals(DateTimes.of("2015-10-25T19:30:00.000Z"), row.getTimestamp());
Assert.assertEquals("type-a", Iterables.getOnlyElement(row.getDimension("eventType")));
Assert.assertEquals(679865987569912369L, row.getMetric("someLong"));
}
private InputEntityReader createReader(
ObjectMapper mapper,
Map<String, Object> readerSchema

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
import { cleanSpec, downgradeSpec, upgradeSpec } from './ingestion-spec';
import { cleanSpec, downgradeSpec, guessInputFormat, upgradeSpec } from './ingestion-spec';
describe('ingestion-spec', () => {
const oldSpec = {
@ -120,4 +120,35 @@ describe('ingestion-spec', () => {
},
});
});
describe('guessInputFormat', () => {
it('works for parquet', () => {
expect(guessInputFormat(['PAR1lol']).type).toEqual('parquet');
});
it('works for orc', () => {
expect(guessInputFormat(['ORClol']).type).toEqual('orc');
});
it('works for AVRO', () => {
expect(guessInputFormat(['Obj\x01lol']).type).toEqual('avro_ocf');
expect(guessInputFormat(['Obj1lol']).type).toEqual('regex');
});
it('works for JSON', () => {
expect(guessInputFormat(['{"a":1}']).type).toEqual('json');
});
it('works for TSV', () => {
expect(guessInputFormat(['A\tB\tX\tY']).type).toEqual('tsv');
});
it('works for CSV', () => {
expect(guessInputFormat(['A,B,X,Y']).type).toEqual('csv');
});
it('works for regex', () => {
expect(guessInputFormat(['A|B|X|Y']).type).toEqual('regex');
});
});
});

View File

@ -2656,7 +2656,7 @@ export function fillInputFormat(spec: IngestionSpec, sampleData: string[]): Inge
return deepSet(spec, 'spec.ioConfig.inputFormat', guessInputFormat(sampleData));
}
function guessInputFormat(sampleData: string[]): InputFormat {
export function guessInputFormat(sampleData: string[]): InputFormat {
let sampleDatum = sampleData[0];
if (sampleDatum) {
sampleDatum = String(sampleDatum); // Really ensure it is a string
@ -2672,7 +2672,7 @@ function guessInputFormat(sampleData: string[]): InputFormat {
return inputFormatFromType('orc');
}
// Avro OCF 4 byte magic header: https://avro.apache.org/docs/current/spec.html#Object+Container+Files
if (sampleDatum.startsWith('Obj1')) {
if (sampleDatum.startsWith('Obj') && sampleDatum.charCodeAt(3) === 1) {
return inputFormatFromType('avro_ocf');
}

View File

@ -1235,9 +1235,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
fillDataSourceNameIfNeeded(
fillInputFormat(
spec,
filterMap(inputQueryState.data.data, l =>
l.parsed ? l.parsed.raw : undefined,
),
filterMap(inputQueryState.data.data, l => (l.input ? l.input.raw : undefined)),
),
),
);

View File

@ -613,6 +613,7 @@ Avro-1124
SchemaRepo
avro
avroBytesDecoder
flattenSpec
jq
org.apache.druid.extensions
schemaRepository