From 2705fe98fa4513c506ac4ffb4c71fc2be51f7738 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 19 Jul 2021 09:32:05 -0700 Subject: [PATCH] Fix avro json serde issues (#11455) --- .../data/input/AvroHadoopInputRowParser.java | 46 ++++++++ .../data/input/AvroStreamInputRowParser.java | 29 ++++- .../data/input/avro/AvroOCFInputFormat.java | 35 +++++- .../input/avro/AvroStreamInputFormat.java | 13 ++- .../input/AvroHadoopInputRowParserTest.java | 24 ++++ .../data/input/AvroStreamInputFormatTest.java | 19 ++- .../input/AvroStreamInputRowParserTest.java | 18 +++ .../input/avro/AvroOCFInputFormatTest.java | 108 ++++++++++++++++++ 8 files changed, 284 insertions(+), 8 deletions(-) create mode 100644 extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java index 165f6a40a6e..39bdc87cc5b 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.java.util.common.parsers.ObjectFlattener; import java.util.List; +import java.util.Objects; public class AvroHadoopInputRowParser implements InputRowParser { @@ -74,9 +75,54 @@ public class AvroHadoopInputRowParser implements InputRowParser return fromPigAvroStorage; } + @JsonProperty + public Boolean getBinaryAsString() + { + return binaryAsString; + } + + @JsonProperty + public Boolean isExtractUnionsByType() + { + return extractUnionsByType; + } + @Override public InputRowParser withParseSpec(ParseSpec parseSpec) { return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AvroHadoopInputRowParser that = (AvroHadoopInputRowParser) o; + return fromPigAvroStorage == that.fromPigAvroStorage + && binaryAsString == that.binaryAsString + && extractUnionsByType == that.extractUnionsByType + && Objects.equals(parseSpec, that.parseSpec); + } + + @Override + public int hashCode() + { + return Objects.hash(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType); + } + + @Override + public String toString() + { + return "AvroHadoopInputRowParser{" + + "parseSpec=" + parseSpec + + ", fromPigAvroStorage=" + fromPigAvroStorage + + ", binaryAsString=" + binaryAsString + + ", extractUnionsByType=" + extractUnionsByType + + '}'; + } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java index fcb8bd83322..969bf259b44 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java @@ -77,6 +77,18 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser return avroBytesDecoder; } + @JsonProperty + public Boolean getBinaryAsString() + { + return binaryAsString; + } + + @JsonProperty + public Boolean isExtractUnionsByType() + { + return extractUnionsByType; + } + @Override public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) { @@ -99,12 +111,25 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser } final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o; return Objects.equals(parseSpec, that.parseSpec) && - Objects.equals(avroBytesDecoder, that.avroBytesDecoder); + Objects.equals(avroBytesDecoder, that.avroBytesDecoder) && + Objects.equals(binaryAsString, that.binaryAsString) && + Objects.equals(extractUnionsByType, that.extractUnionsByType); } @Override public int hashCode() { - return Objects.hash(parseSpec, avroBytesDecoder); + return Objects.hash(parseSpec, avroBytesDecoder, binaryAsString, extractUnionsByType); + } + + @Override + public String toString() + { + return "AvroStreamInputRowParser{" + + "parseSpec=" + parseSpec + + ", binaryAsString=" + binaryAsString + + ", extractUnionsByType=" + extractUnionsByType + + ", avroBytesDecoder=" + avroBytesDecoder + + '}'; } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java index 7417477ea50..f5ea6a34d4e 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFInputFormat.java @@ -43,6 +43,8 @@ public class AvroOCFInputFormat extends NestedInputFormat private final boolean binaryAsString; private final boolean extractUnionsByType; + private final Map schema; + @Nullable private final Schema readerSchema; @@ -56,6 +58,7 @@ public class AvroOCFInputFormat extends NestedInputFormat ) throws Exception { super(flattenSpec); + this.schema = schema; // If a reader schema is supplied create the datum reader with said schema, otherwise use the writer schema if (schema != null) { String schemaStr = mapper.writeValueAsString(schema); @@ -76,6 +79,24 @@ public class AvroOCFInputFormat extends NestedInputFormat return false; } + @JsonProperty + public Map getSchema() + { + return schema; + } + + @JsonProperty + public Boolean getBinaryAsString() + { + return binaryAsString; + } + + @JsonProperty + public Boolean isExtractUnionsByType() + { + return extractUnionsByType; + } + @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { @@ -103,13 +124,23 @@ public class AvroOCFInputFormat extends NestedInputFormat return false; } AvroOCFInputFormat that = (AvroOCFInputFormat) o; - return binaryAsString == that.binaryAsString && + return binaryAsString == that.binaryAsString && extractUnionsByType == that.extractUnionsByType && Objects.equals(readerSchema, that.readerSchema); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), binaryAsString, readerSchema); + return Objects.hash(super.hashCode(), binaryAsString, readerSchema, extractUnionsByType); + } + + @Override + public String toString() + { + return "AvroOCFInputFormat{" + + "binaryAsString=" + binaryAsString + + ", extractUnionsByType=" + extractUnionsByType + + ", readerSchema=" + readerSchema + + '}'; } } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java index d42f3593da6..bf0cecfa321 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamInputFormat.java @@ -71,6 +71,12 @@ public class AvroStreamInputFormat extends NestedInputFormat return binaryAsString; } + @JsonProperty + public Boolean isExtractUnionsByType() + { + return extractUnionsByType; + } + @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { @@ -95,13 +101,14 @@ public class AvroStreamInputFormat extends NestedInputFormat } final AvroStreamInputFormat that = (AvroStreamInputFormat) o; return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) && - Objects.equals(avroBytesDecoder, that.avroBytesDecoder) && - Objects.equals(binaryAsString, that.binaryAsString); + Objects.equals(avroBytesDecoder, that.avroBytesDecoder) && + Objects.equals(binaryAsString, that.binaryAsString) && + Objects.equals(extractUnionsByType, that.extractUnionsByType); } @Override public int hashCode() { - return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString); + return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString, extractUnionsByType); } } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java index c6ade851b56..12791b548a6 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroHadoopInputRowParserTest.java @@ -29,6 +29,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.druid.data.input.avro.AvroExtensionsModule; import org.apache.druid.java.util.common.FileUtils; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -48,6 +49,28 @@ public class AvroHadoopInputRowParserTest } } + @Test + public void testSerde() throws IOException + { + AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, false, false, false); + AvroHadoopInputRowParser parser2 = jsonMapper.readValue( + jsonMapper.writeValueAsBytes(parser), + AvroHadoopInputRowParser.class + ); + Assert.assertEquals(parser, parser2); + } + + @Test + public void testSerdeNonDefaults() throws IOException + { + AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, true, true, true); + AvroHadoopInputRowParser parser2 = jsonMapper.readValue( + jsonMapper.writeValueAsBytes(parser), + AvroHadoopInputRowParser.class + ); + Assert.assertEquals(parser, parser2); + } + @Test public void testParseNotFromPigAvroStorage() throws IOException { @@ -67,6 +90,7 @@ public class AvroHadoopInputRowParserTest jsonMapper.writeValueAsBytes(parser), AvroHadoopInputRowParser.class ); + Assert.assertEquals(parser, parser2); InputRow inputRow = parser2.parseBatch(record).get(0); AvroStreamInputRowParserTest.assertInputRowCorrect(inputRow, AvroStreamInputRowParserTest.DIMENSIONS, fromPigAvroStorage); } diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java index 918aa0abb51..4213008ba2b 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputFormatTest.java @@ -64,7 +64,6 @@ import static org.apache.druid.data.input.AvroStreamInputRowParserTest.buildSome public class AvroStreamInputFormatTest { - private static final String EVENT_TYPE = "eventType"; private static final String ID = "id"; private static final String SOME_OTHER_ID = "someOtherId"; @@ -129,6 +128,24 @@ public class AvroStreamInputFormatTest Assert.assertEquals(inputFormat, inputFormat2); } + @Test + public void testSerdeNonDefault() throws IOException + { + Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io"); + AvroStreamInputFormat inputFormat = new AvroStreamInputFormat( + flattenSpec, + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), + true, + true + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(inputFormat, inputFormat2); + } + @Test public void testSerdeForSchemaRegistry() throws IOException { diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java index 8ed9f44fc06..69608db5b42 100644 --- a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/AvroStreamInputRowParserTest.java @@ -189,6 +189,24 @@ public class AvroStreamInputRowParserTest Assert.assertEquals(parser, parser2); } + @Test + public void testSerdeNonDefault() throws IOException + { + Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io"); + AvroStreamInputRowParser parser = new AvroStreamInputRowParser( + PARSE_SPEC, + new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository), + true, + true + ); + ByteBufferInputRowParser parser2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(parser), + ByteBufferInputRowParser.class + ); + + Assert.assertEquals(parser, parser2); + } + @Test public void testParse() throws SchemaValidationException, IOException { diff --git a/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java new file mode 100644 index 00000000000..a5c40a78cc0 --- /dev/null +++ b/extensions-core/avro-extensions/src/test/java/org/apache/druid/data/input/avro/AvroOCFInputFormatTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.avro; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.impl.NestedInputFormat; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; + +public class AvroOCFInputFormatTest +{ + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private JSONPathSpec flattenSpec; + + @Before + public void before() + { + flattenSpec = new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong") + ) + ); + for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) { + jsonMapper.registerModule(jacksonModule); + } + jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper)); + } + + @Test + public void testSerde() throws Exception + { + AvroOCFInputFormat inputFormat = new AvroOCFInputFormat( + jsonMapper, + flattenSpec, + null, + false, + false + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(inputFormat, inputFormat2); + } + + @Test + public void testSerdeNonDefaults() throws Exception + { + String schemaStr = "{\n" + + " \"namespace\": \"org.apache.druid.data.input\",\n" + + " \"name\": \"SomeAvroDatum\",\n" + + " \"type\": \"record\",\n" + + " \"fields\" : [\n" + + " {\"name\":\"timestamp\",\"type\":\"long\"},\n" + + " {\"name\":\"someLong\",\"type\":\"long\"}\n," + + " {\"name\":\"eventClass\",\"type\":\"string\", \"aliases\": [\"eventType\"]}\n" + + " ]\n" + + "}"; + + TypeReference> typeRef = new TypeReference>() + { + }; + final Map readerSchema = jsonMapper.readValue(schemaStr, typeRef); + AvroOCFInputFormat inputFormat = new AvroOCFInputFormat( + jsonMapper, + flattenSpec, + readerSchema, + true, + true + ); + NestedInputFormat inputFormat2 = jsonMapper.readValue( + jsonMapper.writeValueAsString(inputFormat), + NestedInputFormat.class + ); + + Assert.assertEquals(inputFormat, inputFormat2); + } +}