Stop leaking Avro objects from parser (#12828)

The Avro parsing code leaks some "object" representations.
We need to convert them into Maps/Lists so that other code
can understand and expect good things.  Previously, these
objects were handled with .toString(), but that's not a
good contract in terms of how to work with objects.
This commit is contained in:
imply-cheddar 2022-08-17 14:46:20 -07:00 committed by GitHub
parent 752e42a312
commit 536415b948
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 97 additions and 7 deletions

View File

@ -36,7 +36,9 @@ import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
@ -164,7 +166,7 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
} else if (field instanceof Utf8) {
return field.toString();
} else if (field instanceof List) {
return ((List<?>) field).stream().filter(Objects::nonNull).collect(Collectors.toList());
return ((List<?>) field).stream().filter(Objects::nonNull).map(this::transformValue).collect(Collectors.toList());
} else if (field instanceof GenericEnumSymbol) {
return field.toString();
} else if (field instanceof GenericFixed) {
@ -173,6 +175,20 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
} else {
return ((GenericFixed) field).bytes();
}
} else if (field instanceof Map) {
LinkedHashMap<String, Object> retVal = new LinkedHashMap<>();
Map<?, ?> fieldMap = (Map<?, ?>) field;
for (Map.Entry<?, ?> entry : fieldMap.entrySet()) {
retVal.put(String.valueOf(entry.getKey()), transformValue(entry.getValue()));
}
return retVal;
} else if (field instanceof GenericRecord) {
LinkedHashMap<String, Object> retVal = new LinkedHashMap<>();
GenericRecord record = (GenericRecord) field;
for (Schema.Field key : record.getSchema().getFields()) {
retVal.put(key.name(), transformValue(record.get(key.pos())));
}
return retVal;
}
return field;
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.avro.Schema;
@ -64,6 +65,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@ -325,7 +327,46 @@ public class AvroStreamInputRowParserTest
inputRow.getDimension("someStringArray")
);
final Object someRecordArrayObj = inputRow.getRaw("someRecordArray");
Assert.assertNotNull(someRecordArrayObj);
Assert.assertTrue(someRecordArrayObj instanceof List);
Assert.assertEquals(1, ((List) someRecordArrayObj).size());
final Object recordArrayElementObj = ((List) someRecordArrayObj).get(0);
Assert.assertNotNull(recordArrayElementObj);
Assert.assertTrue(recordArrayElementObj instanceof LinkedHashMap);
LinkedHashMap recordArrayElement = (LinkedHashMap) recordArrayElementObj;
Assert.assertEquals("string in record", recordArrayElement.get("nestedString"));
}
final Object someIntValueMapObj = inputRow.getRaw("someIntValueMap");
Assert.assertNotNull(someIntValueMapObj);
Assert.assertTrue(someIntValueMapObj instanceof LinkedHashMap);
LinkedHashMap someIntValueMap = (LinkedHashMap) someIntValueMapObj;
Assert.assertEquals(4, someIntValueMap.size());
Assert.assertEquals(1, someIntValueMap.get("1"));
Assert.assertEquals(2, someIntValueMap.get("2"));
Assert.assertEquals(4, someIntValueMap.get("4"));
Assert.assertEquals(8, someIntValueMap.get("8"));
final Object someStringValueMapObj = inputRow.getRaw("someStringValueMap");
Assert.assertNotNull(someStringValueMapObj);
Assert.assertTrue(someStringValueMapObj instanceof LinkedHashMap);
LinkedHashMap someStringValueMap = (LinkedHashMap) someStringValueMapObj;
Assert.assertEquals(4, someStringValueMap.size());
Assert.assertEquals("1", someStringValueMap.get("1"));
Assert.assertEquals("2", someStringValueMap.get("2"));
Assert.assertEquals("4", someStringValueMap.get("4"));
Assert.assertEquals("8", someStringValueMap.get("8"));
final Object someRecordObj = inputRow.getRaw("someRecord");
Assert.assertNotNull(someRecordObj);
Assert.assertTrue(someRecordObj instanceof LinkedHashMap);
LinkedHashMap someRecord = (LinkedHashMap) someRecordObj;
Assert.assertEquals(4892, someRecord.get("subInt"));
Assert.assertEquals(1543698L, someRecord.get("subLong"));
// towards Map avro field as druid dimension, need to convert its toString() back to HashMap to check equality
Assert.assertEquals(1, inputRow.getDimension("someIntValueMap").size());
Assert.assertEquals(
@ -369,7 +410,7 @@ public class AvroStreamInputRowParserTest
);
Assert.assertEquals(Collections.singletonList(String.valueOf(MyEnum.ENUM1)), inputRow.getDimension("someEnum"));
Assert.assertEquals(
Collections.singletonList(String.valueOf(SOME_RECORD_VALUE)),
Collections.singletonList(ImmutableMap.of("subInt", 4892, "subLong", 1543698L).toString()),
inputRow.getDimension("someRecord")
);

View File

@ -19,6 +19,7 @@
package org.apache.druid.data.input.avro;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.data.input.AvroStreamInputRowParserTest;
import org.apache.druid.data.input.SomeAvroDatum;
@ -29,10 +30,12 @@ import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AvroFlattenerMakerTest
{
@ -214,8 +217,13 @@ public class AvroFlattenerMakerTest
record.getSomeEnum().toString(),
flattener.getRootField(record, "someEnum")
);
Map<String, Object> map = new HashMap<>();
record.getSomeRecord()
.getSchema()
.getFields()
.forEach(field -> map.put(field.name(), record.getSomeRecord().get(field.name())));
Assert.assertEquals(
record.getSomeRecord(),
map,
flattener.getRootField(record, "someRecord")
);
Assert.assertEquals(
@ -230,8 +238,17 @@ public class AvroFlattenerMakerTest
record.getSomeFloat(),
flattener.getRootField(record, "someFloat")
);
List<Map<String, Object>> list = new ArrayList<>();
for (GenericRecord genericRecord : record.getSomeRecordArray()) {
Map<String, Object> map1 = new HashMap<>();
genericRecord
.getSchema()
.getFields()
.forEach(field -> map1.put(field.name(), genericRecord.get(field.name())));
list.add(map1);
}
Assert.assertEquals(
record.getSomeRecordArray(),
list,
flattener.getRootField(record, "someRecordArray")
);
}
@ -328,8 +345,13 @@ public class AvroFlattenerMakerTest
record.getSomeEnum().toString(),
flattener.makeJsonPathExtractor("$.someEnum").apply(record)
);
Map<String, Object> map = new HashMap<>();
record.getSomeRecord()
.getSchema()
.getFields()
.forEach(field -> map.put(field.name(), record.getSomeRecord().get(field.name())));
Assert.assertEquals(
record.getSomeRecord(),
map,
flattener.makeJsonPathExtractor("$.someRecord").apply(record)
);
Assert.assertEquals(
@ -344,8 +366,19 @@ public class AvroFlattenerMakerTest
record.getSomeFloat(),
flattener.makeJsonPathExtractor("$.someFloat").apply(record)
);
List<Map<String, Object>> list = new ArrayList<>();
for (GenericRecord genericRecord : record.getSomeRecordArray()) {
Map<String, Object> map1 = new HashMap<>();
genericRecord
.getSchema()
.getFields()
.forEach(field -> map1.put(field.name(), genericRecord.get(field.name())));
list.add(map1);
}
Assert.assertEquals(
record.getSomeRecordArray(),
list,
flattener.makeJsonPathExtractor("$.someRecordArray").apply(record)
);
@ -355,7 +388,7 @@ public class AvroFlattenerMakerTest
);
Assert.assertEquals(
record.getSomeRecordArray(),
list,
flattener.makeJsonPathExtractor("$.someRecordArray[?(@.nestedString)]").apply(record)
);