From 7002ecd303984c15f31cc04e600bd6d53dd27ab8 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 9 Dec 2022 12:24:21 -0800 Subject: [PATCH] add protobuf flattener, direct to plain java conversion for faster flattening (#13519) * add protobuf flattener, direct to plain java conversion for faster flattening, nested column tests --- .../common/parsers/FlattenerJsonProvider.java | 138 ++++++++++ .../common/parsers/JSONFlattenerMaker.java | 18 +- .../parsers/FlattenerJsonProviderTest.java | 146 +++++++++++ .../input/avro/GenericAvroJsonProvider.java | 93 +------ .../data/input/orc/OrcStructJsonProvider.java | 114 +------- .../simple/ParquetGroupJsonProvider.java | 104 +------- extensions-core/protobuf-extensions/pom.xml | 19 +- .../input/protobuf/ProtobufConverter.java | 244 ++++++++++++++++++ .../protobuf/ProtobufFlattenerMaker.java | 114 ++++++++ .../input/protobuf/ProtobufInputFormat.java | 9 +- .../input/protobuf/ProtobufJsonProvider.java | 79 ++++++ .../data/input/protobuf/ProtobufReader.java | 60 ++--- .../protobuf/ProtobufInputFormatTest.java | 243 ++++++++++++++++- .../input/protobuf/ProtobufReaderTest.java | 8 - 14 files changed, 1013 insertions(+), 376 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java create mode 100644 core/src/test/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProviderTest.java create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java create mode 100644 extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java new file mode 100644 index 00000000000..2574f94b688 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/FlattenerJsonProvider.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.parsers; + +import com.jayway.jsonpath.InvalidJsonException; +import com.jayway.jsonpath.spi.json.JsonProvider; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public abstract class FlattenerJsonProvider implements JsonProvider +{ + @Override + public Object createArray() + { + return new ArrayList<>(); + } + + @Override + public Object createMap() + { + return new LinkedHashMap<>(); + } + + @Override + public int length(final Object o) + { + if (o instanceof List) { + return ((List) o).size(); + } else { + return 0; + } + } + + @Override + public Iterable toIterable(final Object o) + { + if (o instanceof List) { + return (List) o; + } + throw new UnsupportedOperationException(o.getClass().getName()); + } + + @Override + public Object getArrayIndex(final Object o, final int i) + { + if (o instanceof List) { + return ((List) o).get(i); + } + throw new UnsupportedOperationException(o.getClass().getName()); + } + + @Override + public void setArrayIndex(final Object o, final int i, final Object o1) + { + if (o instanceof List) { + final List list = (List) o; + if (list.size() == i) { + list.add(o1); + } else { + list.set(i, o1); + } + } else { + throw new UnsupportedOperationException(o.getClass().getName()); + } + } + + @Override + public void setProperty(final Object o, final Object o1, final Object o2) + { + if (o instanceof Map) { + ((Map) o).put(o1, o2); + } else { + throw new UnsupportedOperationException(o.getClass().getName()); + } + } + + @Override + public void removeProperty(final Object o, final Object o1) + { + if (o instanceof Map) { + ((Map) o).remove(o1); + } else { + throw new UnsupportedOperationException(o.getClass().getName()); + } + } + + @Override + @Deprecated + public Object getArrayIndex(final Object o, final int i, final boolean b) + { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + public Object parse(final String s) throws InvalidJsonException + { + throw new UnsupportedOperationException("Unused"); + } + + @Override + public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException + { + throw new UnsupportedOperationException("Unused"); + } + + @Override + public String toJson(final Object o) + { + throw new UnsupportedOperationException("Unused"); + } + + @Override + public Object unwrap(final Object o) + { + return o; + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java index 589ed5ffbc5..b54b51f8634 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/JSONFlattenerMaker.java @@ -54,9 +54,9 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker discoverRootFields(final JsonNode obj) { - return FluentIterable.from(() -> obj.fields()) + return FluentIterable.from(obj::fields) .filter( entry -> { final JsonNode val = entry.getValue(); @@ -137,13 +137,13 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker newList = new ArrayList<>(); for (JsonNode entry : val) { if (!entry.isNull()) { - newList.add(finalizeConversionForMap(entry)); + newList.add(convertJsonNode(entry, enc)); } } return newList; @@ -185,7 +185,7 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker newMap = new LinkedHashMap<>(); for (Iterator> it = val.fields(); it.hasNext(); ) { Map.Entry entry = it.next(); - newMap.put(entry.getKey(), finalizeConversionForMap(entry.getValue())); + newMap.put(entry.getKey(), convertJsonNode(entry.getValue(), enc)); } return newMap; } @@ -197,7 +197,7 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker getPropertyKeys(final Object o) + { + throw new RuntimeException("not tested"); + } + + @Override + public Object getMapValue(final Object o, final String s) + { + throw new RuntimeException("not tested"); + } + }; + + @Test + public void testMapStuff() + { + Object aMap = jsonProvider.createMap(); + jsonProvider.setProperty(aMap, "key", "value"); + Assert.assertEquals(ImmutableMap.of("key", "value"), aMap); + jsonProvider.removeProperty(aMap, "key"); + Assert.assertEquals(ImmutableMap.of(), aMap); + Assert.assertEquals(aMap, jsonProvider.unwrap(aMap)); + + Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.setProperty(jsonProvider.createArray(), "key", "value") + ); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.removeProperty(jsonProvider.createArray(), "key") + ); + } + + @Test + public void testArrayStuff() + { + Object aList = jsonProvider.createArray(); + jsonProvider.setArrayIndex(aList, 0, "a"); + jsonProvider.setArrayIndex(aList, 1, "b"); + jsonProvider.setArrayIndex(aList, 2, "c"); + Assert.assertEquals(3, jsonProvider.length(aList)); + Assert.assertEquals("a", jsonProvider.getArrayIndex(aList, 0)); + Assert.assertEquals("b", jsonProvider.getArrayIndex(aList, 1)); + Assert.assertEquals("c", jsonProvider.getArrayIndex(aList, 2)); + List expected = ImmutableList.of("a", "b", "c"); + Assert.assertEquals(expected, aList); + Iterator iter = jsonProvider.toIterable(aList).iterator(); + Iterator expectedIter = expected.iterator(); + while (iter.hasNext()) { + Assert.assertEquals(expectedIter.next(), iter.next()); + } + Assert.assertFalse(expectedIter.hasNext()); + Assert.assertEquals(aList, jsonProvider.unwrap(aList)); + + Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.getArrayIndex(jsonProvider.createMap(), 0) + ); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.setArrayIndex(jsonProvider.createMap(), 0, "a") + ); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.toIterable(jsonProvider.createMap()) + ); + } + + @Test + public void testNotImplementedOnPurpose() + { + Object aList = jsonProvider.createArray(); + Throwable t = Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.toJson(aList) + ); + Assert.assertEquals("Unused", t.getMessage()); + + t = Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.parse("{}") + ); + Assert.assertEquals("Unused", t.getMessage()); + + + t = Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.parse(new ByteArrayInputStream(StringUtils.toUtf8("{}")), "UTF-8") + ); + Assert.assertEquals("Unused", t.getMessage()); + + t = Assert.assertThrows( + UnsupportedOperationException.class, + () -> jsonProvider.getArrayIndex(aList, 0, false) + ); + Assert.assertEquals("Deprecated", t.getMessage()); + } +} diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java index 86bbb7d7e97..07055007673 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/GenericAvroJsonProvider.java @@ -20,22 +20,17 @@ package org.apache.druid.data.input.avro; import com.google.common.collect.ImmutableMap; -import com.jayway.jsonpath.InvalidJsonException; -import com.jayway.jsonpath.spi.json.JsonProvider; import org.apache.avro.Schema; import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider; import javax.annotation.Nullable; - -import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -43,7 +38,7 @@ import java.util.stream.Collectors; /** * JsonProvider for JsonPath + Avro. */ -public class GenericAvroJsonProvider implements JsonProvider +public class GenericAvroJsonProvider extends FlattenerJsonProvider { private final boolean extractUnionsByType; @@ -52,36 +47,6 @@ public class GenericAvroJsonProvider implements JsonProvider this.extractUnionsByType = extractUnionsByType; } - @Override - public Object parse(final String s) throws InvalidJsonException - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public String toJson(final Object o) - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public Object createArray() - { - return new ArrayList<>(); - } - - @Override - public Object createMap() - { - return new HashMap<>(); - } - @Override public boolean isArray(final Object o) { @@ -100,16 +65,6 @@ public class GenericAvroJsonProvider implements JsonProvider } } - @Override - public Iterable toIterable(final Object o) - { - if (o instanceof List) { - return (List) o; - } else { - throw new UnsupportedOperationException(); - } - } - @Override public Collection getPropertyKeys(final Object o) { @@ -124,34 +79,6 @@ public class GenericAvroJsonProvider implements JsonProvider } } - @Override - public Object getArrayIndex(final Object o, final int i) - { - return ((List) o).get(i); - } - - @Override - @Deprecated - public Object getArrayIndex(final Object o, final int i, final boolean b) - { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - public void setArrayIndex(final Object o, final int i, final Object o1) - { - if (o instanceof List) { - final List list = (List) o; - if (list.size() == i) { - list.add(o1); - } else { - list.set(i, o1); - } - } else { - throw new UnsupportedOperationException(); - } - } - @Nullable @Override public Object getMapValue(final Object o, final String s) @@ -189,28 +116,12 @@ public class GenericAvroJsonProvider implements JsonProvider } } - @Override - public void removeProperty(final Object o, final Object o1) - { - if (o instanceof Map) { - ((Map) o).remove(o1); - } else { - throw new UnsupportedOperationException(); - } - } - @Override public boolean isMap(final Object o) { return o == null || o instanceof Map || o instanceof GenericRecord; } - @Override - public Object unwrap(final Object o) - { - return o; - } - private boolean isExtractableUnion(final Schema.Field field) { return field.schema().isUnion() && diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java index 15f81b6f203..384a8f833af 100644 --- a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java +++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructJsonProvider.java @@ -19,22 +19,18 @@ package org.apache.druid.data.input.orc; -import com.jayway.jsonpath.InvalidJsonException; -import com.jayway.jsonpath.spi.json.JsonProvider; +import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider; import org.apache.hadoop.io.Text; import org.apache.orc.mapred.OrcMap; import org.apache.orc.mapred.OrcStruct; -import java.io.InputStream; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -public class OrcStructJsonProvider implements JsonProvider +public class OrcStructJsonProvider extends FlattenerJsonProvider { private final OrcStructConverter converter; @@ -43,18 +39,6 @@ public class OrcStructJsonProvider implements JsonProvider this.converter = converter; } - @Override - public Object createArray() - { - return new ArrayList<>(); - } - - @Override - public Object createMap() - { - return new HashMap<>(); - } - @Override public boolean isArray(final Object o) { @@ -67,25 +51,6 @@ public class OrcStructJsonProvider implements JsonProvider return o == null || o instanceof Map || o instanceof OrcStruct; } - @Override - public int length(final Object o) - { - if (o instanceof List) { - return ((List) o).size(); - } else { - return 0; - } - } - - @Override - public Iterable toIterable(final Object o) - { - if (o instanceof List) { - return (List) o; - } - throw new UnsupportedOperationException(o.getClass().getName()); - } - @Override public Collection getPropertyKeys(final Object o) { @@ -117,79 +82,4 @@ public class OrcStructJsonProvider implements JsonProvider } throw new UnsupportedOperationException(o.getClass().getName()); } - - @Override - public Object getArrayIndex(final Object o, final int i) - { - if (o instanceof List) { - return ((List) o).get(i); - } - throw new UnsupportedOperationException(o.getClass().getName()); - } - - @Override - public void setArrayIndex(final Object o, final int i, final Object o1) - { - if (o instanceof List) { - final List list = (List) o; - if (list.size() == i) { - list.add(o1); - } else { - list.set(i, o1); - } - } else { - throw new UnsupportedOperationException(o.getClass().getName()); - } - } - - @Override - public void setProperty(final Object o, final Object o1, final Object o2) - { - if (o instanceof Map) { - ((Map) o).put(o1, o2); - } else { - throw new UnsupportedOperationException(o.getClass().getName()); - } - } - - @Override - public void removeProperty(final Object o, final Object o1) - { - if (o instanceof Map) { - ((Map) o).remove(o1); - } else { - throw new UnsupportedOperationException(o.getClass().getName()); - } - } - - @Override - @Deprecated - public Object getArrayIndex(final Object o, final int i, final boolean b) - { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - public Object parse(final String s) throws InvalidJsonException - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public String toJson(final Object o) - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public Object unwrap(final Object o) - { - return o; - } } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java index 3190c7e28ad..27234ea24a3 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetGroupJsonProvider.java @@ -19,15 +19,11 @@ package org.apache.druid.data.input.parquet.simple; -import com.jayway.jsonpath.InvalidJsonException; -import com.jayway.jsonpath.spi.json.JsonProvider; +import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider; import org.apache.parquet.example.data.Group; -import java.io.InputStream; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -35,7 +31,7 @@ import java.util.stream.Collectors; /** * Provides json path for Parquet {@link Group} objects */ -public class ParquetGroupJsonProvider implements JsonProvider +public class ParquetGroupJsonProvider extends FlattenerJsonProvider { private final ParquetGroupConverter converter; @@ -44,18 +40,6 @@ public class ParquetGroupJsonProvider implements JsonProvider this.converter = converter; } - @Override - public Object createArray() - { - return new ArrayList<>(); - } - - @Override - public Object createMap() - { - return new HashMap<>(); - } - @Override public boolean isArray(final Object o) { @@ -82,15 +66,6 @@ public class ParquetGroupJsonProvider implements JsonProvider } } - @Override - public Iterable toIterable(final Object o) - { - if (o instanceof List) { - return (List) o; - } - throw new UnsupportedOperationException(o.getClass().getName()); - } - @Override public Collection getPropertyKeys(final Object o) { @@ -118,80 +93,5 @@ public class ParquetGroupJsonProvider implements JsonProvider } throw new UnsupportedOperationException(o.getClass().getName()); } - - @Override - public Object getArrayIndex(final Object o, final int i) - { - if (o instanceof List) { - return ((List) o).get(i); - } - throw new UnsupportedOperationException(o.getClass().getName()); - } - - @Override - public void setArrayIndex(final Object o, final int i, final Object o1) - { - if (o instanceof List) { - final List list = (List) o; - if (list.size() == i) { - list.add(o1); - } else { - list.set(i, o1); - } - } else { - throw new UnsupportedOperationException(o.getClass().getName()); - } - } - - @Override - public void setProperty(final Object o, final Object o1, final Object o2) - { - if (o instanceof Map) { - ((Map) o).put(o1, o2); - } else { - throw new UnsupportedOperationException(o.getClass().getName()); - } - } - - @Override - public void removeProperty(final Object o, final Object o1) - { - if (o instanceof Map) { - ((Map) o).remove(o1); - } else { - throw new UnsupportedOperationException(o.getClass().getName()); - } - } - - @Override - @Deprecated - public Object getArrayIndex(final Object o, final int i, final boolean b) - { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - public Object parse(final String s) throws InvalidJsonException - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public String toJson(final Object o) - { - throw new UnsupportedOperationException("Unused"); - } - - @Override - public Object unwrap(final Object o) - { - return o; - } } diff --git a/extensions-core/protobuf-extensions/pom.xml b/extensions-core/protobuf-extensions/pom.xml index 0085f21aa56..ab0b7ed2e27 100644 --- a/extensions-core/protobuf-extensions/pom.xml +++ b/extensions-core/protobuf-extensions/pom.xml @@ -52,6 +52,12 @@ ${project.parent.version} provided + + org.apache.druid + druid-processing + ${project.parent.version} + provided + com.google.protobuf protobuf-java @@ -133,13 +139,22 @@ com.google.code.findbugs jsr305 - 2.0.1 provided com.fasterxml.jackson.core jackson-core + + com.jayway.jsonpath + json-path + provided + + + net.thisptr + jackson-jq + provided + @@ -161,7 +176,7 @@ org.apache.druid druid-processing ${project.parent.version} - test + test-jar nl.jqno.equalsverifier diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java new file mode 100644 index 00000000000..a032e33ac8a --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufConverter.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.Any; +import com.google.protobuf.BoolValue; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.Descriptors; +import com.google.protobuf.DoubleValue; +import com.google.protobuf.Duration; +import com.google.protobuf.FieldMask; +import com.google.protobuf.FloatValue; +import com.google.protobuf.Int32Value; +import com.google.protobuf.Int64Value; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.ListValue; +import com.google.protobuf.Message; +import com.google.protobuf.StringValue; +import com.google.protobuf.Struct; +import com.google.protobuf.Timestamp; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; +import com.google.protobuf.Value; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.FieldMaskUtil; +import com.google.protobuf.util.JsonFormat; +import com.google.protobuf.util.Timestamps; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Convert {@link Message} to plain java stuffs, based roughly on the conversions done with {@link JsonFormat} + */ +public class ProtobufConverter +{ + private static final Map SPECIAL_CONVERSIONS = buildSpecializedConversions(); + + @Nullable + public static Map convertMessage(Message msg) throws InvalidProtocolBufferException + { + if (msg == null) { + return null; + } + final Map fields = msg.getAllFields(); + final Map converted = Maps.newHashMapWithExpectedSize(fields.size()); + for (Map.Entry field : fields.entrySet()) { + converted.put(field.getKey().getJsonName(), convertField(field.getKey(), field.getValue())); + } + return converted; + } + + @Nullable + private static Object convertField(Descriptors.FieldDescriptor field, Object value) + throws InvalidProtocolBufferException + { + // handle special types + if (value instanceof Message) { + Message msg = (Message) value; + final String typeName = msg.getDescriptorForType().getFullName(); + SpecializedConverter converter = SPECIAL_CONVERSIONS.get(typeName); + if (converter != null) { + return converter.convert(msg); + } + } + + if (field.isMapField()) { + return convertMap(field, value); + } else if (field.isRepeated()) { + return convertList(field, (List) value); + } else { + return convertSingleValue(field, value); + } + } + + @Nonnull + private static List convertList(Descriptors.FieldDescriptor field, List value) + throws InvalidProtocolBufferException + { + final List theList = Lists.newArrayListWithExpectedSize(value.size()); + for (Object element : value) { + theList.add(convertSingleValue(field, element)); + } + return theList; + } + + @Nullable + private static Object convertMap(Descriptors.FieldDescriptor field, Object value) + throws InvalidProtocolBufferException + { + final Descriptors.Descriptor type = field.getMessageType(); + final Descriptors.FieldDescriptor keyField = type.findFieldByName("key"); + final Descriptors.FieldDescriptor valueField = type.findFieldByName("value"); + if (keyField == null || valueField == null) { + throw new InvalidProtocolBufferException("Invalid map field."); + } + + @SuppressWarnings("unchecked") + final List elements = (List) value; + final HashMap theMap = Maps.newHashMapWithExpectedSize(elements.size()); + for (Object element : elements) { + Message entry = (Message) element; + theMap.put( + (String) convertSingleValue(keyField, entry.getField(keyField)), + convertSingleValue(valueField, entry.getField(valueField)) + ); + } + return theMap; + } + + @Nullable + private static Object convertSingleValue(Descriptors.FieldDescriptor field, Object value) + throws InvalidProtocolBufferException + { + switch (field.getType()) { + case BYTES: + return ((ByteString) value).toByteArray(); + case ENUM: + if ("google.protobuf.NullValue".equals(field.getEnumType().getFullName())) { + return null; + } else { + return ((Descriptors.EnumValueDescriptor) value).getName(); + } + case MESSAGE: + case GROUP: + return convertMessage((Message) value); + default: + // pass through everything else + return value; + } + } + + private static Map buildSpecializedConversions() + { + final Map converters = new HashMap<>(); + final SpecializedConverter parappaTheWrappa = msg -> { + final Descriptors.Descriptor descriptor = msg.getDescriptorForType(); + final Descriptors.FieldDescriptor valueField = descriptor.findFieldByName("value"); + if (valueField == null) { + throw new InvalidProtocolBufferException("Invalid Wrapper type."); + } + return convertSingleValue(valueField, msg.getField(valueField)); + }; + converters.put(BoolValue.getDescriptor().getFullName(), parappaTheWrappa); + converters.put(Int32Value.getDescriptor().getFullName(), parappaTheWrappa); + converters.put(UInt32Value.getDescriptor().getFullName(), parappaTheWrappa); + converters.put(Int64Value.getDescriptor().getFullName(), parappaTheWrappa); + converters.put(UInt64Value.getDescriptor().getFullName(), parappaTheWrappa); + converters.put(StringValue.getDescriptor().getFullName(), parappaTheWrappa); + converters.put(BytesValue.getDescriptor().getFullName(), parappaTheWrappa); + converters.put(FloatValue.getDescriptor().getFullName(), parappaTheWrappa); + converters.put(DoubleValue.getDescriptor().getFullName(), parappaTheWrappa); + converters.put( + Any.getDescriptor().getFullName(), + msg -> JsonFormat.printer().print(msg) // meh + ); + converters.put( + Timestamp.getDescriptor().getFullName(), + msg -> { + final Timestamp ts = Timestamp.parseFrom(msg.toByteString()); + return Timestamps.toString(ts); + } + ); + converters.put( + Duration.getDescriptor().getFullName(), + msg -> { + final Duration duration = Duration.parseFrom(msg.toByteString()); + return Durations.toString(duration); + } + ); + converters.put( + FieldMask.getDescriptor().getFullName(), + msg -> FieldMaskUtil.toJsonString(FieldMask.parseFrom(msg.toByteString())) + ); + converters.put( + Struct.getDescriptor().getFullName(), + msg -> { + final Descriptors.Descriptor descriptor = msg.getDescriptorForType(); + final Descriptors.FieldDescriptor field = descriptor.findFieldByName("fields"); + if (field == null) { + throw new InvalidProtocolBufferException("Invalid Struct type."); + } + // Struct is formatted as a map object. + return convertSingleValue(field, msg.getField(field)); + } + ); + converters.put( + Value.getDescriptor().getFullName(), + msg -> { + final Map fields = msg.getAllFields(); + if (fields.isEmpty()) { + return null; + } + if (fields.size() != 1) { + throw new InvalidProtocolBufferException("Invalid Value type."); + } + final Map.Entry entry = fields.entrySet().stream().findFirst().get(); + return convertSingleValue(entry.getKey(), entry.getValue()); + } + ); + converters.put( + ListValue.getDescriptor().getFullName(), + msg -> { + Descriptors.Descriptor descriptor = msg.getDescriptorForType(); + Descriptors.FieldDescriptor field = descriptor.findFieldByName("values"); + if (field == null) { + throw new InvalidProtocolBufferException("Invalid ListValue type."); + } + return convertList(field, (List) msg.getField(field)); + } + ); + return converters; + } + + @FunctionalInterface + interface SpecializedConverter + { + @Nullable + Object convert(Message msg) throws InvalidProtocolBufferException; + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java new file mode 100644 index 00000000000..a3314e42731 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufFlattenerMaker.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; +import com.jayway.jsonpath.spi.json.JsonProvider; +import net.thisptr.jackson.jq.JsonQuery; +import net.thisptr.jackson.jq.exception.JsonQueryException; +import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; +import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; + +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +/** + * Basically a plain java object {@link ObjectFlatteners.FlattenerMaker}, but it lives here for now... + */ +public class ProtobufFlattenerMaker implements ObjectFlatteners.FlattenerMaker> +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ProtobufJsonProvider JSON_PROVIDER = new ProtobufJsonProvider(); + + private static final Configuration CONFIG = Configuration.builder() + .jsonProvider(JSON_PROVIDER) + .mappingProvider(new NotImplementedMappingProvider()) + .options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS)) + .build(); + + private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder(); + + @Override + public JsonProvider getJsonProvider() + { + return JSON_PROVIDER; + } + + @Override + public Iterable discoverRootFields(Map obj) + { + // in the future we can just return obj.keySet(), but for now this doesnt expect nested fields... + Set rootFields = Sets.newHashSetWithExpectedSize(obj.keySet().size()); + for (Map.Entry entry : obj.entrySet()) { + if (entry.getValue() instanceof List || entry.getValue() instanceof Map) { + continue; + } + rootFields.add(entry.getKey()); + } + return rootFields; + } + + @Override + public Object getRootField(Map obj, String key) + { + return obj.get(key); + } + + @Override + public Function, Object> makeJsonPathExtractor(String expr) + { + final JsonPath path = JsonPath.compile(expr); + return map -> path.read(map, CONFIG); + } + + @Override + public Function, Object> makeJsonQueryExtractor(String expr) + { + final JsonQuery jsonQuery; + try { + jsonQuery = JsonQuery.compile(expr); + } + catch (JsonQueryException e) { + throw new RuntimeException(e); + } + return map -> { + try { + return JSONFlattenerMaker.convertJsonNode( + jsonQuery.apply((JsonNode) OBJECT_MAPPER.valueToTree(map)).get(0), + enc + ); + } + catch (JsonQueryException e) { + throw new RuntimeException(e); + } + }; + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java index 36ae06875e3..e1ae1a0e713 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputFormat.java @@ -28,7 +28,6 @@ import org.apache.druid.data.input.impl.NestedInputFormat; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import javax.annotation.Nullable; - import java.io.File; import java.util.Objects; @@ -61,12 +60,7 @@ public class ProtobufInputFormat extends NestedInputFormat @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return new ProtobufReader( - inputRowSchema, - source, - protobufBytesDecoder, - getFlattenSpec() - ); + return new ProtobufReader(inputRowSchema, source, protobufBytesDecoder, getFlattenSpec()); } @Override @@ -88,5 +82,4 @@ public class ProtobufInputFormat extends NestedInputFormat { return Objects.hash(protobufBytesDecoder, getFlattenSpec()); } - } diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java new file mode 100644 index 00000000000..c8e3ebeba73 --- /dev/null +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufJsonProvider.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.protobuf; + +import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Basically a plain java object {@link FlattenerJsonProvider}, but it lives here for now... + */ +public class ProtobufJsonProvider extends FlattenerJsonProvider +{ + @Override + public boolean isArray(final Object o) + { + return o instanceof List; + } + + @Override + public boolean isMap(final Object o) + { + return o == null || o instanceof Map; + } + + @Override + public int length(final Object o) + { + if (o instanceof List) { + return ((List) o).size(); + } else { + return 0; + } + } + + @Override + public Collection getPropertyKeys(final Object o) + { + if (o == null) { + return Collections.emptySet(); + } else if (o instanceof Map) { + return ((Map) o).keySet().stream().map(String::valueOf).collect(Collectors.toSet()); + } else { + throw new UnsupportedOperationException(o.getClass().getName()); + } + } + + @Override + public Object getMapValue(final Object o, final String s) + { + if (o == null) { + return null; + } else if (o instanceof Map) { + return ((Map) o).get(s); + } + throw new UnsupportedOperationException(o.getClass().getName()); + } +} diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java index d512d108c33..2dc6aa1f8ca 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java @@ -19,13 +19,10 @@ package org.apache.druid.data.input.protobuf; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterators; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; +import com.google.protobuf.Message; import org.apache.commons.io.IOUtils; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputRow; @@ -34,12 +31,10 @@ import org.apache.druid.data.input.IntermediateRowParsingReader; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.utils.CollectionUtils; import java.io.IOException; import java.nio.ByteBuffer; @@ -49,11 +44,9 @@ import java.util.Map; public class ProtobufReader extends IntermediateRowParsingReader { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final InputRowSchema inputRowSchema; private final InputEntity source; - private final JSONPathSpec flattenSpec; - private final ObjectFlattener recordFlattener; + private final ObjectFlattener> recordFlattener; private final ProtobufBytesDecoder protobufBytesDecoder; ProtobufReader( @@ -63,17 +56,10 @@ public class ProtobufReader extends IntermediateRowParsingReader JSONPathSpec flattenSpec ) { - if (flattenSpec == null) { - this.inputRowSchema = new ProtobufInputRowSchema(inputRowSchema); - this.recordFlattener = null; - } else { - this.inputRowSchema = inputRowSchema; - this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true)); - } - + this.inputRowSchema = inputRowSchema; + this.recordFlattener = ObjectFlatteners.create(flattenSpec, new ProtobufFlattenerMaker()); this.source = source; this.protobufBytesDecoder = protobufBytesDecoder; - this.flattenSpec = flattenSpec; } @Override @@ -91,33 +77,27 @@ public class ProtobufReader extends IntermediateRowParsingReader } @Override - protected List parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException + protected List parseInputRows(DynamicMessage intermediateRow) throws ParseException { - Map record; - - if (flattenSpec == null || JSONPathSpec.DEFAULT.equals(flattenSpec)) { - try { - record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName()); - } - catch (Exception ex) { - throw new ParseException(null, ex, "Protobuf message could not be parsed"); - } - } else { - try { - String json = JsonFormat.printer().print(intermediateRow); - record = recordFlattener.flatten(OBJECT_MAPPER.readValue(json, JsonNode.class)); - } - catch (InvalidProtocolBufferException e) { - throw new ParseException(null, e, "Protobuf message could not be parsed"); - } - } - + final Map record; + final Map plainJava = convertMessage(intermediateRow); + record = recordFlattener.flatten(plainJava); return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, record)); } @Override - protected List> toMap(DynamicMessage intermediateRow) throws JsonProcessingException, InvalidProtocolBufferException + protected List> toMap(DynamicMessage intermediateRow) { - return Collections.singletonList(new ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow), Map.class)); + return Collections.singletonList(convertMessage(intermediateRow)); + } + + private static Map convertMessage(Message msg) + { + try { + return ProtobufConverter.convertMessage(msg); + } + catch (InvalidProtocolBufferException e) { + throw new ParseException(null, e, "Protobuf message could not be parsed"); + } } } diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index 219fb23aac7..1d0cfe91665 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -22,8 +22,12 @@ package org.apache.druid.data.input.protobuf; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Files; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.ByteEntity; @@ -36,6 +40,12 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.math.expr.ExpressionProcessing; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.NestedDataDimensionSchema; +import org.apache.druid.segment.transform.ExpressionTransform; +import org.apache.druid.segment.transform.TransformSpec; +import org.apache.druid.segment.transform.TransformingInputEntityReader; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; import org.junit.Assert; @@ -46,6 +56,7 @@ import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; +import java.util.Collections; public class ProtobufInputFormatTest { @@ -63,6 +74,8 @@ public class ProtobufInputFormatTest @Before public void setUp() throws Exception { + NullHandling.initializeForTests(); + ExpressionProcessing.initializeForTests(null); timestampSpec = new TimestampSpec("timestamp", "iso", null); dimensionsSpec = new DimensionsSpec(Lists.newArrayList( new StringDimensionSchema("event"), @@ -125,7 +138,7 @@ public class ProtobufInputFormatTest } @Test - public void testParseNestedData() throws Exception + public void testParseFlattenData() throws Exception { //configure parser with desc file ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, decoder); @@ -136,11 +149,225 @@ public class ProtobufInputFormatTest final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); - InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); + InputRow row = protobufInputFormat.createReader( + new InputRowSchema(timestampSpec, dimensionsSpec, null), + entity, + null + ).read().next(); + + Assert.assertEquals( + ImmutableList.builder() + .add("event") + .add("id") + .add("someOtherId") + .add("isValid") + .build(), + row.getDimensions() + ); ProtobufInputRowParserTest.verifyNestedData(row, dateTime); } + @Test + public void testParseFlattenDataJq() throws Exception + { + //configure parser with desc file + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat( + new JSONPathSpec( + true, + Lists.newArrayList( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "foobar", ".foo.bar"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "bar0", ".bar[0].bar") + ) + ), + decoder + ); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime); + + final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); + + InputRow row = protobufInputFormat.createReader( + new InputRowSchema(timestampSpec, dimensionsSpec, null), + entity, + null + ).read().next(); + + Assert.assertEquals( + ImmutableList.builder() + .add("event") + .add("id") + .add("someOtherId") + .add("isValid") + .build(), + row.getDimensions() + ); + + ProtobufInputRowParserTest.verifyNestedData(row, dateTime); + } + + @Test + public void testParseFlattenDataDiscover() throws Exception + { + //configure parser with desc file + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, decoder); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime); + + final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); + + InputRow row = protobufInputFormat.createReader( + new InputRowSchema(timestampSpec, new DimensionsSpec(Collections.emptyList()), null), + entity, + null + ).read().next(); + + Assert.assertEquals( + ImmutableList.builder() + .add("eventType") + .add("foobar") + .add("bar0") + .add("someOtherId") + .add("someIntColumn") + .add("isValid") + .add("description") + .add("someLongColumn") + .add("someFloatColumn") + .add("id") + .add("timestamp") + .build(), + row.getDimensions() + ); + + ProtobufInputRowParserTest.verifyNestedData(row, dateTime); + } + + @Test + public void testParseNestedData() throws Exception + { + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat( + JSONPathSpec.DEFAULT, + decoder + ); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime); + + final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); + + InputEntityReader reader = protobufInputFormat.createReader( + new InputRowSchema( + timestampSpec, + new DimensionsSpec( + Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid"), + new StringDimensionSchema("eventType"), + new NestedDataDimensionSchema("foo"), + new NestedDataDimensionSchema("bar") + ) + ), + null + ), + entity, + null + ); + + TransformSpec transformSpec = new TransformSpec( + null, + Lists.newArrayList( + new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')", TestExprMacroTable.INSTANCE) + ) + ); + TransformingInputEntityReader transformingReader = new TransformingInputEntityReader( + reader, + transformSpec.toTransformer() + ); + + + InputRow row = transformingReader.read().next(); + + Assert.assertEquals( + ImmutableList.builder() + .add("event") + .add("id") + .add("someOtherId") + .add("isValid") + .add("eventType") + .add("foo") + .add("bar") + .build(), + row.getDimensions() + ); + + Assert.assertEquals(ImmutableMap.of("bar", "baz"), row.getRaw("foo")); + Assert.assertEquals( + ImmutableList.of(ImmutableMap.of("bar", "bar0"), ImmutableMap.of("bar", "bar1")), + row.getRaw("bar") + ); + ProtobufInputRowParserTest.verifyNestedData(row, dateTime); + + } + + @Test + public void testParseNestedDataTransformsOnly() throws Exception + { + ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat( + JSONPathSpec.DEFAULT, + decoder + ); + + //create binary of proto test event + DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC()); + ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime); + + final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); + + InputEntityReader reader = protobufInputFormat.createReader( + new InputRowSchema( + timestampSpec, + new DimensionsSpec( + Lists.newArrayList( + new StringDimensionSchema("event"), + new StringDimensionSchema("id"), + new StringDimensionSchema("someOtherId"), + new StringDimensionSchema("isValid"), + new StringDimensionSchema("eventType") + ) + ), + null + ), + entity, + null + ); + + TransformSpec transformSpec = new TransformSpec( + null, + Lists.newArrayList( + new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')", TestExprMacroTable.INSTANCE), + new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')", TestExprMacroTable.INSTANCE) + ) + ); + TransformingInputEntityReader transformingReader = new TransformingInputEntityReader( + reader, + transformSpec.toTransformer() + ); + + + InputRow row = transformingReader.read().next(); + ProtobufInputRowParserTest.verifyNestedData(row, dateTime); + + } + @Test public void testParseFlatData() throws Exception { @@ -153,7 +380,11 @@ public class ProtobufInputFormatTest final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); - InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); + InputRow row = protobufInputFormat.createReader( + new InputRowSchema(timestampSpec, dimensionsSpec, null), + entity, + null + ).read().next(); ProtobufInputRowParserTest.verifyFlatData(row, dateTime); } @@ -170,7 +401,11 @@ public class ProtobufInputFormatTest final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event)); - InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next(); + InputRow row = protobufInputFormat.createReader( + new InputRowSchema(timestampSpec, dimensionsSpec, null), + entity, + null + ).read().next(); ProtobufInputRowParserTest.verifyNestedData(row, dateTime); } diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java index eba0c8cb955..24807e64248 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufReaderTest.java @@ -28,21 +28,15 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathSpec; -import org.apache.druid.java.util.common.parsers.ParseException; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; public class ProtobufReaderTest { - @Rule - public ExpectedException expectedException = ExpectedException.none(); - private InputRowSchema inputRowSchema; private InputRowSchema inputRowSchemaWithComplexTimestamp; private JSONPathSpec flattenSpec; @@ -129,8 +123,6 @@ public class ProtobufReaderTest @Test public void testParseFlatDataWithComplexTimestampWithDefaultFlattenSpec() throws Exception { - expectedException.expect(ParseException.class); - expectedException.expectMessage("is unparseable!"); ProtobufReader reader = new ProtobufReader( inputRowSchemaWithComplexTimestamp, null,