Fix avro json serde issues (#11455)

This commit is contained in:
Clint Wylie 2021-07-19 09:32:05 -07:00 committed by GitHub
parent 8729b40893
commit 2705fe98fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 284 additions and 8 deletions

View File

@ -29,6 +29,7 @@ import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import java.util.List;
import java.util.Objects;
public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
{
@ -74,9 +75,54 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
return fromPigAvroStorage;
}
@JsonProperty
public Boolean getBinaryAsString()
{
return binaryAsString;
}
@JsonProperty
public Boolean isExtractUnionsByType()
{
return extractUnionsByType;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new AvroHadoopInputRowParser(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AvroHadoopInputRowParser that = (AvroHadoopInputRowParser) o;
return fromPigAvroStorage == that.fromPigAvroStorage
&& binaryAsString == that.binaryAsString
&& extractUnionsByType == that.extractUnionsByType
&& Objects.equals(parseSpec, that.parseSpec);
}
@Override
public int hashCode()
{
return Objects.hash(parseSpec, fromPigAvroStorage, binaryAsString, extractUnionsByType);
}
@Override
public String toString()
{
return "AvroHadoopInputRowParser{" +
"parseSpec=" + parseSpec +
", fromPigAvroStorage=" + fromPigAvroStorage +
", binaryAsString=" + binaryAsString +
", extractUnionsByType=" + extractUnionsByType +
'}';
}
}

View File

@ -77,6 +77,18 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
return avroBytesDecoder;
}
@JsonProperty
public Boolean getBinaryAsString()
{
return binaryAsString;
}
@JsonProperty
public Boolean isExtractUnionsByType()
{
return extractUnionsByType;
}
@Override
public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec)
{
@ -99,12 +111,25 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
}
final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o;
return Objects.equals(parseSpec, that.parseSpec) &&
Objects.equals(avroBytesDecoder, that.avroBytesDecoder);
Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
Objects.equals(binaryAsString, that.binaryAsString) &&
Objects.equals(extractUnionsByType, that.extractUnionsByType);
}
@Override
public int hashCode()
{
return Objects.hash(parseSpec, avroBytesDecoder);
return Objects.hash(parseSpec, avroBytesDecoder, binaryAsString, extractUnionsByType);
}
@Override
public String toString()
{
return "AvroStreamInputRowParser{" +
"parseSpec=" + parseSpec +
", binaryAsString=" + binaryAsString +
", extractUnionsByType=" + extractUnionsByType +
", avroBytesDecoder=" + avroBytesDecoder +
'}';
}
}

View File

@ -43,6 +43,8 @@ public class AvroOCFInputFormat extends NestedInputFormat
private final boolean binaryAsString;
private final boolean extractUnionsByType;
private final Map<String, Object> schema;
@Nullable
private final Schema readerSchema;
@ -56,6 +58,7 @@ public class AvroOCFInputFormat extends NestedInputFormat
) throws Exception
{
super(flattenSpec);
this.schema = schema;
// If a reader schema is supplied create the datum reader with said schema, otherwise use the writer schema
if (schema != null) {
String schemaStr = mapper.writeValueAsString(schema);
@ -76,6 +79,24 @@ public class AvroOCFInputFormat extends NestedInputFormat
return false;
}
@JsonProperty
public Map<String, Object> getSchema()
{
return schema;
}
@JsonProperty
public Boolean getBinaryAsString()
{
return binaryAsString;
}
@JsonProperty
public Boolean isExtractUnionsByType()
{
return extractUnionsByType;
}
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
@ -103,13 +124,23 @@ public class AvroOCFInputFormat extends NestedInputFormat
return false;
}
AvroOCFInputFormat that = (AvroOCFInputFormat) o;
return binaryAsString == that.binaryAsString &&
return binaryAsString == that.binaryAsString && extractUnionsByType == that.extractUnionsByType &&
Objects.equals(readerSchema, that.readerSchema);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), binaryAsString, readerSchema);
return Objects.hash(super.hashCode(), binaryAsString, readerSchema, extractUnionsByType);
}
@Override
public String toString()
{
return "AvroOCFInputFormat{" +
"binaryAsString=" + binaryAsString +
", extractUnionsByType=" + extractUnionsByType +
", readerSchema=" + readerSchema +
'}';
}
}

View File

@ -71,6 +71,12 @@ public class AvroStreamInputFormat extends NestedInputFormat
return binaryAsString;
}
@JsonProperty
public Boolean isExtractUnionsByType()
{
return extractUnionsByType;
}
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
@ -95,13 +101,14 @@ public class AvroStreamInputFormat extends NestedInputFormat
}
final AvroStreamInputFormat that = (AvroStreamInputFormat) o;
return Objects.equals(getFlattenSpec(), that.getFlattenSpec()) &&
Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
Objects.equals(binaryAsString, that.binaryAsString);
Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
Objects.equals(binaryAsString, that.binaryAsString) &&
Objects.equals(extractUnionsByType, that.extractUnionsByType);
}
@Override
public int hashCode()
{
return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString);
return Objects.hash(getFlattenSpec(), avroBytesDecoder, binaryAsString, extractUnionsByType);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.druid.data.input.avro.AvroExtensionsModule;
import org.apache.druid.java.util.common.FileUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -48,6 +49,28 @@ public class AvroHadoopInputRowParserTest
}
}
@Test
public void testSerde() throws IOException
{
AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, false, false, false);
AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
);
Assert.assertEquals(parser, parser2);
}
@Test
public void testSerdeNonDefaults() throws IOException
{
AvroHadoopInputRowParser parser = new AvroHadoopInputRowParser(AvroStreamInputRowParserTest.PARSE_SPEC, true, true, true);
AvroHadoopInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
);
Assert.assertEquals(parser, parser2);
}
@Test
public void testParseNotFromPigAvroStorage() throws IOException
{
@ -67,6 +90,7 @@ public class AvroHadoopInputRowParserTest
jsonMapper.writeValueAsBytes(parser),
AvroHadoopInputRowParser.class
);
Assert.assertEquals(parser, parser2);
InputRow inputRow = parser2.parseBatch(record).get(0);
AvroStreamInputRowParserTest.assertInputRowCorrect(inputRow, AvroStreamInputRowParserTest.DIMENSIONS, fromPigAvroStorage);
}

View File

@ -64,7 +64,6 @@ import static org.apache.druid.data.input.AvroStreamInputRowParserTest.buildSome
public class AvroStreamInputFormatTest
{
private static final String EVENT_TYPE = "eventType";
private static final String ID = "id";
private static final String SOME_OTHER_ID = "someOtherId";
@ -129,6 +128,24 @@ public class AvroStreamInputFormatTest
Assert.assertEquals(inputFormat, inputFormat2);
}
@Test
public void testSerdeNonDefault() throws IOException
{
Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
AvroStreamInputFormat inputFormat = new AvroStreamInputFormat(
flattenSpec,
new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
true,
true
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(inputFormat),
NestedInputFormat.class
);
Assert.assertEquals(inputFormat, inputFormat2);
}
@Test
public void testSerdeForSchemaRegistry() throws IOException
{

View File

@ -189,6 +189,24 @@ public class AvroStreamInputRowParserTest
Assert.assertEquals(parser, parser2);
}
@Test
public void testSerdeNonDefault() throws IOException
{
Repository repository = new Avro1124RESTRepositoryClientWrapper("http://github.io");
AvroStreamInputRowParser parser = new AvroStreamInputRowParser(
PARSE_SPEC,
new SchemaRepoBasedAvroBytesDecoder<>(new Avro1124SubjectAndIdConverter(TOPIC), repository),
true,
true
);
ByteBufferInputRowParser parser2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(parser),
ByteBufferInputRowParser.class
);
Assert.assertEquals(parser, parser2);
}
@Test
public void testParse() throws SchemaValidationException, IOException
{

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.avro;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
public class AvroOCFInputFormatTest
{
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
private JSONPathSpec flattenSpec;
@Before
public void before()
{
flattenSpec = new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "nested", "someRecord.subLong")
)
);
for (Module jacksonModule : new AvroExtensionsModule().getJacksonModules()) {
jsonMapper.registerModule(jacksonModule);
}
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
@Test
public void testSerde() throws Exception
{
AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(
jsonMapper,
flattenSpec,
null,
false,
false
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(inputFormat),
NestedInputFormat.class
);
Assert.assertEquals(inputFormat, inputFormat2);
}
@Test
public void testSerdeNonDefaults() throws Exception
{
String schemaStr = "{\n"
+ " \"namespace\": \"org.apache.druid.data.input\",\n"
+ " \"name\": \"SomeAvroDatum\",\n"
+ " \"type\": \"record\",\n"
+ " \"fields\" : [\n"
+ " {\"name\":\"timestamp\",\"type\":\"long\"},\n"
+ " {\"name\":\"someLong\",\"type\":\"long\"}\n,"
+ " {\"name\":\"eventClass\",\"type\":\"string\", \"aliases\": [\"eventType\"]}\n"
+ " ]\n"
+ "}";
TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>()
{
};
final Map<String, Object> readerSchema = jsonMapper.readValue(schemaStr, typeRef);
AvroOCFInputFormat inputFormat = new AvroOCFInputFormat(
jsonMapper,
flattenSpec,
readerSchema,
true,
true
);
NestedInputFormat inputFormat2 = jsonMapper.readValue(
jsonMapper.writeValueAsString(inputFormat),
NestedInputFormat.class
);
Assert.assertEquals(inputFormat, inputFormat2);
}
}