add protobuf flattener, direct to plain java conversion for faster flattening (#13519)

* add protobuf flattener, direct to plain java conversion for faster flattening, nested column tests
This commit is contained in:
Clint Wylie 2022-12-09 12:24:21 -08:00 committed by GitHub
parent 4ebdfe226d
commit 7002ecd303
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1013 additions and 376 deletions

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.parsers;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.spi.json.JsonProvider;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public abstract class FlattenerJsonProvider implements JsonProvider
{
@Override
public Object createArray()
{
return new ArrayList<>();
}
@Override
public Object createMap()
{
return new LinkedHashMap<>();
}
@Override
public int length(final Object o)
{
if (o instanceof List) {
return ((List) o).size();
} else {
return 0;
}
}
@Override
public Iterable<?> toIterable(final Object o)
{
if (o instanceof List) {
return (List) o;
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public Object getArrayIndex(final Object o, final int i)
{
if (o instanceof List) {
return ((List) o).get(i);
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public void setArrayIndex(final Object o, final int i, final Object o1)
{
if (o instanceof List) {
final List list = (List) o;
if (list.size() == i) {
list.add(o1);
} else {
list.set(i, o1);
}
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public void setProperty(final Object o, final Object o1, final Object o2)
{
if (o instanceof Map) {
((Map) o).put(o1, o2);
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public void removeProperty(final Object o, final Object o1)
{
if (o instanceof Map) {
((Map) o).remove(o1);
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
@Deprecated
public Object getArrayIndex(final Object o, final int i, final boolean b)
{
throw new UnsupportedOperationException("Deprecated");
}
@Override
public Object parse(final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public String toJson(final Object o)
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object unwrap(final Object o)
{
return o;
}
}

View File

@ -54,9 +54,9 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
private final boolean keepNullValues;
private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
public JSONFlattenerMaker(boolean keepNullValues)
{
@ -66,7 +66,7 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
@Override
public Iterable<String> discoverRootFields(final JsonNode obj)
{
return FluentIterable.from(() -> obj.fields())
return FluentIterable.from(obj::fields)
.filter(
entry -> {
final JsonNode val = entry.getValue();
@ -137,13 +137,13 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
public Object finalizeConversionForMap(Object o)
{
if (o instanceof JsonNode) {
return convertJsonNode((JsonNode) o);
return convertJsonNode((JsonNode) o, enc);
}
return o;
}
@Nullable
private Object convertJsonNode(JsonNode val)
public static Object convertJsonNode(JsonNode val, CharsetEncoder enc)
{
if (val == null || val.isNull()) {
return null;
@ -158,7 +158,7 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
}
if (val.isTextual()) {
return charsetFix(val.asText());
return charsetFix(val.asText(), enc);
}
if (val.isBoolean()) {
@ -175,7 +175,7 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
List<Object> newList = new ArrayList<>();
for (JsonNode entry : val) {
if (!entry.isNull()) {
newList.add(finalizeConversionForMap(entry));
newList.add(convertJsonNode(entry, enc));
}
}
return newList;
@ -185,7 +185,7 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
Map<String, Object> newMap = new LinkedHashMap<>();
for (Iterator<Map.Entry<String, JsonNode>> it = val.fields(); it.hasNext(); ) {
Map.Entry<String, JsonNode> entry = it.next();
newMap.put(entry.getKey(), finalizeConversionForMap(entry.getValue()));
newMap.put(entry.getKey(), convertJsonNode(entry.getValue(), enc));
}
return newMap;
}
@ -197,7 +197,7 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
}
@Nullable
private String charsetFix(String s)
private static String charsetFix(String s, CharsetEncoder enc)
{
if (s != null && !enc.canEncode(s)) {
// Some whacky characters are in this string (e.g. \uD900). These are problematic because they are decodeable
@ -209,7 +209,7 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
}
}
private boolean isFlatList(JsonNode list)
private static boolean isFlatList(JsonNode list)
{
for (JsonNode obj : list) {
if (obj.isObject() || obj.isArray()) {

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.parsers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
public class FlattenerJsonProviderTest
{
FlattenerJsonProvider jsonProvider = new FlattenerJsonProvider()
{
@Override
public boolean isArray(final Object o)
{
throw new RuntimeException("not tested");
}
@Override
public boolean isMap(final Object o)
{
throw new RuntimeException("not tested");
}
@Override
public Collection<String> getPropertyKeys(final Object o)
{
throw new RuntimeException("not tested");
}
@Override
public Object getMapValue(final Object o, final String s)
{
throw new RuntimeException("not tested");
}
};
@Test
public void testMapStuff()
{
Object aMap = jsonProvider.createMap();
jsonProvider.setProperty(aMap, "key", "value");
Assert.assertEquals(ImmutableMap.of("key", "value"), aMap);
jsonProvider.removeProperty(aMap, "key");
Assert.assertEquals(ImmutableMap.of(), aMap);
Assert.assertEquals(aMap, jsonProvider.unwrap(aMap));
Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.setProperty(jsonProvider.createArray(), "key", "value")
);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.removeProperty(jsonProvider.createArray(), "key")
);
}
@Test
public void testArrayStuff()
{
Object aList = jsonProvider.createArray();
jsonProvider.setArrayIndex(aList, 0, "a");
jsonProvider.setArrayIndex(aList, 1, "b");
jsonProvider.setArrayIndex(aList, 2, "c");
Assert.assertEquals(3, jsonProvider.length(aList));
Assert.assertEquals("a", jsonProvider.getArrayIndex(aList, 0));
Assert.assertEquals("b", jsonProvider.getArrayIndex(aList, 1));
Assert.assertEquals("c", jsonProvider.getArrayIndex(aList, 2));
List<String> expected = ImmutableList.of("a", "b", "c");
Assert.assertEquals(expected, aList);
Iterator<?> iter = jsonProvider.toIterable(aList).iterator();
Iterator<String> expectedIter = expected.iterator();
while (iter.hasNext()) {
Assert.assertEquals(expectedIter.next(), iter.next());
}
Assert.assertFalse(expectedIter.hasNext());
Assert.assertEquals(aList, jsonProvider.unwrap(aList));
Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.getArrayIndex(jsonProvider.createMap(), 0)
);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.setArrayIndex(jsonProvider.createMap(), 0, "a")
);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.toIterable(jsonProvider.createMap())
);
}
@Test
public void testNotImplementedOnPurpose()
{
Object aList = jsonProvider.createArray();
Throwable t = Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.toJson(aList)
);
Assert.assertEquals("Unused", t.getMessage());
t = Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.parse("{}")
);
Assert.assertEquals("Unused", t.getMessage());
t = Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.parse(new ByteArrayInputStream(StringUtils.toUtf8("{}")), "UTF-8")
);
Assert.assertEquals("Unused", t.getMessage());
t = Assert.assertThrows(
UnsupportedOperationException.class,
() -> jsonProvider.getArrayIndex(aList, 0, false)
);
Assert.assertEquals("Deprecated", t.getMessage());
}
}

View File

@ -20,22 +20,17 @@
package org.apache.druid.data.input.avro;
import com.google.common.collect.ImmutableMap;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
import javax.annotation.Nullable;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -43,7 +38,7 @@ import java.util.stream.Collectors;
/**
* JsonProvider for JsonPath + Avro.
*/
public class GenericAvroJsonProvider implements JsonProvider
public class GenericAvroJsonProvider extends FlattenerJsonProvider
{
private final boolean extractUnionsByType;
@ -52,36 +47,6 @@ public class GenericAvroJsonProvider implements JsonProvider
this.extractUnionsByType = extractUnionsByType;
}
@Override
public Object parse(final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public String toJson(final Object o)
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object createArray()
{
return new ArrayList<>();
}
@Override
public Object createMap()
{
return new HashMap<>();
}
@Override
public boolean isArray(final Object o)
{
@ -100,16 +65,6 @@ public class GenericAvroJsonProvider implements JsonProvider
}
}
@Override
public Iterable<?> toIterable(final Object o)
{
if (o instanceof List) {
return (List) o;
} else {
throw new UnsupportedOperationException();
}
}
@Override
public Collection<String> getPropertyKeys(final Object o)
{
@ -124,34 +79,6 @@ public class GenericAvroJsonProvider implements JsonProvider
}
}
@Override
public Object getArrayIndex(final Object o, final int i)
{
return ((List) o).get(i);
}
@Override
@Deprecated
public Object getArrayIndex(final Object o, final int i, final boolean b)
{
throw new UnsupportedOperationException("Deprecated");
}
@Override
public void setArrayIndex(final Object o, final int i, final Object o1)
{
if (o instanceof List) {
final List list = (List) o;
if (list.size() == i) {
list.add(o1);
} else {
list.set(i, o1);
}
} else {
throw new UnsupportedOperationException();
}
}
@Nullable
@Override
public Object getMapValue(final Object o, final String s)
@ -189,28 +116,12 @@ public class GenericAvroJsonProvider implements JsonProvider
}
}
@Override
public void removeProperty(final Object o, final Object o1)
{
if (o instanceof Map) {
((Map) o).remove(o1);
} else {
throw new UnsupportedOperationException();
}
}
@Override
public boolean isMap(final Object o)
{
return o == null || o instanceof Map || o instanceof GenericRecord;
}
@Override
public Object unwrap(final Object o)
{
return o;
}
private boolean isExtractableUnion(final Schema.Field field)
{
return field.schema().isUnion() &&

View File

@ -19,22 +19,18 @@
package org.apache.druid.data.input.orc;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
import org.apache.hadoop.io.Text;
import org.apache.orc.mapred.OrcMap;
import org.apache.orc.mapred.OrcStruct;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class OrcStructJsonProvider implements JsonProvider
public class OrcStructJsonProvider extends FlattenerJsonProvider
{
private final OrcStructConverter converter;
@ -43,18 +39,6 @@ public class OrcStructJsonProvider implements JsonProvider
this.converter = converter;
}
@Override
public Object createArray()
{
return new ArrayList<>();
}
@Override
public Object createMap()
{
return new HashMap<>();
}
@Override
public boolean isArray(final Object o)
{
@ -67,25 +51,6 @@ public class OrcStructJsonProvider implements JsonProvider
return o == null || o instanceof Map || o instanceof OrcStruct;
}
@Override
public int length(final Object o)
{
if (o instanceof List) {
return ((List) o).size();
} else {
return 0;
}
}
@Override
public Iterable<?> toIterable(final Object o)
{
if (o instanceof List) {
return (List) o;
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public Collection<String> getPropertyKeys(final Object o)
{
@ -117,79 +82,4 @@ public class OrcStructJsonProvider implements JsonProvider
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public Object getArrayIndex(final Object o, final int i)
{
if (o instanceof List) {
return ((List) o).get(i);
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public void setArrayIndex(final Object o, final int i, final Object o1)
{
if (o instanceof List) {
final List list = (List) o;
if (list.size() == i) {
list.add(o1);
} else {
list.set(i, o1);
}
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public void setProperty(final Object o, final Object o1, final Object o2)
{
if (o instanceof Map) {
((Map) o).put(o1, o2);
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public void removeProperty(final Object o, final Object o1)
{
if (o instanceof Map) {
((Map) o).remove(o1);
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
@Deprecated
public Object getArrayIndex(final Object o, final int i, final boolean b)
{
throw new UnsupportedOperationException("Deprecated");
}
@Override
public Object parse(final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public String toJson(final Object o)
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object unwrap(final Object o)
{
return o;
}
}

View File

@ -19,15 +19,11 @@
package org.apache.druid.data.input.parquet.simple;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
import org.apache.parquet.example.data.Group;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -35,7 +31,7 @@ import java.util.stream.Collectors;
/**
* Provides json path for Parquet {@link Group} objects
*/
public class ParquetGroupJsonProvider implements JsonProvider
public class ParquetGroupJsonProvider extends FlattenerJsonProvider
{
private final ParquetGroupConverter converter;
@ -44,18 +40,6 @@ public class ParquetGroupJsonProvider implements JsonProvider
this.converter = converter;
}
@Override
public Object createArray()
{
return new ArrayList<>();
}
@Override
public Object createMap()
{
return new HashMap<>();
}
@Override
public boolean isArray(final Object o)
{
@ -82,15 +66,6 @@ public class ParquetGroupJsonProvider implements JsonProvider
}
}
@Override
public Iterable<?> toIterable(final Object o)
{
if (o instanceof List) {
return (List) o;
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public Collection<String> getPropertyKeys(final Object o)
{
@ -118,80 +93,5 @@ public class ParquetGroupJsonProvider implements JsonProvider
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public Object getArrayIndex(final Object o, final int i)
{
if (o instanceof List) {
return ((List) o).get(i);
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public void setArrayIndex(final Object o, final int i, final Object o1)
{
if (o instanceof List) {
final List list = (List) o;
if (list.size() == i) {
list.add(o1);
} else {
list.set(i, o1);
}
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public void setProperty(final Object o, final Object o1, final Object o2)
{
if (o instanceof Map) {
((Map) o).put(o1, o2);
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public void removeProperty(final Object o, final Object o1)
{
if (o instanceof Map) {
((Map) o).remove(o1);
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
@Deprecated
public Object getArrayIndex(final Object o, final int i, final boolean b)
{
throw new UnsupportedOperationException("Deprecated");
}
@Override
public Object parse(final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public String toJson(final Object o)
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object unwrap(final Object o)
{
return o;
}
}

View File

@ -52,6 +52,12 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
@ -133,13 +139,22 @@
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.thisptr</groupId>
<artifactId>jackson-jq</artifactId>
<scope>provided</scope>
</dependency>
<!-- test -->
<dependency>
@ -161,7 +176,7 @@
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.Any;
import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DoubleValue;
import com.google.protobuf.Duration;
import com.google.protobuf.FieldMask;
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ListValue;
import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.google.protobuf.Struct;
import com.google.protobuf.Timestamp;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.FieldMaskUtil;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.Timestamps;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Convert {@link Message} to plain java stuffs, based roughly on the conversions done with {@link JsonFormat}
*/
public class ProtobufConverter
{
private static final Map<String, SpecializedConverter> SPECIAL_CONVERSIONS = buildSpecializedConversions();
@Nullable
public static Map<String, Object> convertMessage(Message msg) throws InvalidProtocolBufferException
{
if (msg == null) {
return null;
}
final Map<Descriptors.FieldDescriptor, Object> fields = msg.getAllFields();
final Map<String, Object> converted = Maps.newHashMapWithExpectedSize(fields.size());
for (Map.Entry<Descriptors.FieldDescriptor, Object> field : fields.entrySet()) {
converted.put(field.getKey().getJsonName(), convertField(field.getKey(), field.getValue()));
}
return converted;
}
@Nullable
private static Object convertField(Descriptors.FieldDescriptor field, Object value)
throws InvalidProtocolBufferException
{
// handle special types
if (value instanceof Message) {
Message msg = (Message) value;
final String typeName = msg.getDescriptorForType().getFullName();
SpecializedConverter converter = SPECIAL_CONVERSIONS.get(typeName);
if (converter != null) {
return converter.convert(msg);
}
}
if (field.isMapField()) {
return convertMap(field, value);
} else if (field.isRepeated()) {
return convertList(field, (List<?>) value);
} else {
return convertSingleValue(field, value);
}
}
@Nonnull
private static List<Object> convertList(Descriptors.FieldDescriptor field, List<?> value)
throws InvalidProtocolBufferException
{
final List<Object> theList = Lists.newArrayListWithExpectedSize(value.size());
for (Object element : value) {
theList.add(convertSingleValue(field, element));
}
return theList;
}
@Nullable
private static Object convertMap(Descriptors.FieldDescriptor field, Object value)
throws InvalidProtocolBufferException
{
final Descriptors.Descriptor type = field.getMessageType();
final Descriptors.FieldDescriptor keyField = type.findFieldByName("key");
final Descriptors.FieldDescriptor valueField = type.findFieldByName("value");
if (keyField == null || valueField == null) {
throw new InvalidProtocolBufferException("Invalid map field.");
}
@SuppressWarnings("unchecked")
final List<Object> elements = (List<Object>) value;
final HashMap<String, Object> theMap = Maps.newHashMapWithExpectedSize(elements.size());
for (Object element : elements) {
Message entry = (Message) element;
theMap.put(
(String) convertSingleValue(keyField, entry.getField(keyField)),
convertSingleValue(valueField, entry.getField(valueField))
);
}
return theMap;
}
@Nullable
private static Object convertSingleValue(Descriptors.FieldDescriptor field, Object value)
throws InvalidProtocolBufferException
{
switch (field.getType()) {
case BYTES:
return ((ByteString) value).toByteArray();
case ENUM:
if ("google.protobuf.NullValue".equals(field.getEnumType().getFullName())) {
return null;
} else {
return ((Descriptors.EnumValueDescriptor) value).getName();
}
case MESSAGE:
case GROUP:
return convertMessage((Message) value);
default:
// pass through everything else
return value;
}
}
private static Map<String, SpecializedConverter> buildSpecializedConversions()
{
final Map<String, SpecializedConverter> converters = new HashMap<>();
final SpecializedConverter parappaTheWrappa = msg -> {
final Descriptors.Descriptor descriptor = msg.getDescriptorForType();
final Descriptors.FieldDescriptor valueField = descriptor.findFieldByName("value");
if (valueField == null) {
throw new InvalidProtocolBufferException("Invalid Wrapper type.");
}
return convertSingleValue(valueField, msg.getField(valueField));
};
converters.put(BoolValue.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(Int32Value.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(UInt32Value.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(Int64Value.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(UInt64Value.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(StringValue.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(BytesValue.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(FloatValue.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(DoubleValue.getDescriptor().getFullName(), parappaTheWrappa);
converters.put(
Any.getDescriptor().getFullName(),
msg -> JsonFormat.printer().print(msg) // meh
);
converters.put(
Timestamp.getDescriptor().getFullName(),
msg -> {
final Timestamp ts = Timestamp.parseFrom(msg.toByteString());
return Timestamps.toString(ts);
}
);
converters.put(
Duration.getDescriptor().getFullName(),
msg -> {
final Duration duration = Duration.parseFrom(msg.toByteString());
return Durations.toString(duration);
}
);
converters.put(
FieldMask.getDescriptor().getFullName(),
msg -> FieldMaskUtil.toJsonString(FieldMask.parseFrom(msg.toByteString()))
);
converters.put(
Struct.getDescriptor().getFullName(),
msg -> {
final Descriptors.Descriptor descriptor = msg.getDescriptorForType();
final Descriptors.FieldDescriptor field = descriptor.findFieldByName("fields");
if (field == null) {
throw new InvalidProtocolBufferException("Invalid Struct type.");
}
// Struct is formatted as a map object.
return convertSingleValue(field, msg.getField(field));
}
);
converters.put(
Value.getDescriptor().getFullName(),
msg -> {
final Map<Descriptors.FieldDescriptor, Object> fields = msg.getAllFields();
if (fields.isEmpty()) {
return null;
}
if (fields.size() != 1) {
throw new InvalidProtocolBufferException("Invalid Value type.");
}
final Map.Entry<Descriptors.FieldDescriptor, Object> entry = fields.entrySet().stream().findFirst().get();
return convertSingleValue(entry.getKey(), entry.getValue());
}
);
converters.put(
ListValue.getDescriptor().getFullName(),
msg -> {
Descriptors.Descriptor descriptor = msg.getDescriptorForType();
Descriptors.FieldDescriptor field = descriptor.findFieldByName("values");
if (field == null) {
throw new InvalidProtocolBufferException("Invalid ListValue type.");
}
return convertList(field, (List<?>) msg.getField(field));
}
);
return converters;
}
@FunctionalInterface
interface SpecializedConverter
{
@Nullable
Object convert(Message msg) throws InvalidProtocolBufferException;
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import net.thisptr.jackson.jq.JsonQuery;
import net.thisptr.jackson.jq.exception.JsonQueryException;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
/**
* Basically a plain java object {@link ObjectFlatteners.FlattenerMaker}, but it lives here for now...
*/
public class ProtobufFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Map<String, Object>>
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ProtobufJsonProvider JSON_PROVIDER = new ProtobufJsonProvider();
private static final Configuration CONFIG = Configuration.builder()
.jsonProvider(JSON_PROVIDER)
.mappingProvider(new NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
@Override
public JsonProvider getJsonProvider()
{
return JSON_PROVIDER;
}
@Override
public Iterable<String> discoverRootFields(Map<String, Object> obj)
{
// in the future we can just return obj.keySet(), but for now this doesnt expect nested fields...
Set<String> rootFields = Sets.newHashSetWithExpectedSize(obj.keySet().size());
for (Map.Entry<String, Object> entry : obj.entrySet()) {
if (entry.getValue() instanceof List || entry.getValue() instanceof Map) {
continue;
}
rootFields.add(entry.getKey());
}
return rootFields;
}
@Override
public Object getRootField(Map<String, Object> obj, String key)
{
return obj.get(key);
}
@Override
public Function<Map<String, Object>, Object> makeJsonPathExtractor(String expr)
{
final JsonPath path = JsonPath.compile(expr);
return map -> path.read(map, CONFIG);
}
@Override
public Function<Map<String, Object>, Object> makeJsonQueryExtractor(String expr)
{
final JsonQuery jsonQuery;
try {
jsonQuery = JsonQuery.compile(expr);
}
catch (JsonQueryException e) {
throw new RuntimeException(e);
}
return map -> {
try {
return JSONFlattenerMaker.convertJsonNode(
jsonQuery.apply((JsonNode) OBJECT_MAPPER.valueToTree(map)).get(0),
enc
);
}
catch (JsonQueryException e) {
throw new RuntimeException(e);
}
};
}
}

View File

@ -28,7 +28,6 @@ import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Objects;
@ -61,12 +60,7 @@ public class ProtobufInputFormat extends NestedInputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new ProtobufReader(
inputRowSchema,
source,
protobufBytesDecoder,
getFlattenSpec()
);
return new ProtobufReader(inputRowSchema, source, protobufBytesDecoder, getFlattenSpec());
}
@Override
@ -88,5 +82,4 @@ public class ProtobufInputFormat extends NestedInputFormat
{
return Objects.hash(protobufBytesDecoder, getFlattenSpec());
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.protobuf;
import org.apache.druid.java.util.common.parsers.FlattenerJsonProvider;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Basically a plain java object {@link FlattenerJsonProvider}, but it lives here for now...
*/
public class ProtobufJsonProvider extends FlattenerJsonProvider
{
@Override
public boolean isArray(final Object o)
{
return o instanceof List;
}
@Override
public boolean isMap(final Object o)
{
return o == null || o instanceof Map;
}
@Override
public int length(final Object o)
{
if (o instanceof List) {
return ((List<?>) o).size();
} else {
return 0;
}
}
@Override
public Collection<String> getPropertyKeys(final Object o)
{
if (o == null) {
return Collections.emptySet();
} else if (o instanceof Map) {
return ((Map<?, ?>) o).keySet().stream().map(String::valueOf).collect(Collectors.toSet());
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public Object getMapValue(final Object o, final String s)
{
if (o == null) {
return null;
} else if (o instanceof Map) {
return ((Map<?, ?>) o).get(s);
}
throw new UnsupportedOperationException(o.getClass().getName());
}
}

View File

@ -19,13 +19,10 @@
package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterators;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.Message;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
@ -34,12 +31,10 @@ import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -49,11 +44,9 @@ import java.util.Map;
public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final InputRowSchema inputRowSchema;
private final InputEntity source;
private final JSONPathSpec flattenSpec;
private final ObjectFlattener<JsonNode> recordFlattener;
private final ObjectFlattener<Map<String, Object>> recordFlattener;
private final ProtobufBytesDecoder protobufBytesDecoder;
ProtobufReader(
@ -63,17 +56,10 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
JSONPathSpec flattenSpec
)
{
if (flattenSpec == null) {
this.inputRowSchema = new ProtobufInputRowSchema(inputRowSchema);
this.recordFlattener = null;
} else {
this.inputRowSchema = inputRowSchema;
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(true));
}
this.inputRowSchema = inputRowSchema;
this.recordFlattener = ObjectFlatteners.create(flattenSpec, new ProtobufFlattenerMaker());
this.source = source;
this.protobufBytesDecoder = protobufBytesDecoder;
this.flattenSpec = flattenSpec;
}
@Override
@ -91,33 +77,27 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
}
@Override
protected List<InputRow> parseInputRows(DynamicMessage intermediateRow) throws ParseException, JsonProcessingException
protected List<InputRow> parseInputRows(DynamicMessage intermediateRow) throws ParseException
{
Map<String, Object> record;
if (flattenSpec == null || JSONPathSpec.DEFAULT.equals(flattenSpec)) {
try {
record = CollectionUtils.mapKeys(intermediateRow.getAllFields(), k -> k.getJsonName());
}
catch (Exception ex) {
throw new ParseException(null, ex, "Protobuf message could not be parsed");
}
} else {
try {
String json = JsonFormat.printer().print(intermediateRow);
record = recordFlattener.flatten(OBJECT_MAPPER.readValue(json, JsonNode.class));
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(null, e, "Protobuf message could not be parsed");
}
}
final Map<String, Object> record;
final Map<String, Object> plainJava = convertMessage(intermediateRow);
record = recordFlattener.flatten(plainJava);
return Collections.singletonList(MapInputRowParser.parse(inputRowSchema, record));
}
@Override
protected List<Map<String, Object>> toMap(DynamicMessage intermediateRow) throws JsonProcessingException, InvalidProtocolBufferException
protected List<Map<String, Object>> toMap(DynamicMessage intermediateRow)
{
return Collections.singletonList(new ObjectMapper().readValue(JsonFormat.printer().print(intermediateRow), Map.class));
return Collections.singletonList(convertMessage(intermediateRow));
}
private static Map<String, Object> convertMessage(Message msg)
{
try {
return ProtobufConverter.convertMessage(msg);
}
catch (InvalidProtocolBufferException e) {
throw new ParseException(null, e, "Protobuf message could not be parsed");
}
}
}

View File

@ -22,8 +22,12 @@ package org.apache.druid.data.input.protobuf;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
@ -36,6 +40,12 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.NestedDataDimensionSchema;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.transform.TransformingInputEntityReader;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
@ -46,6 +56,7 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
public class ProtobufInputFormatTest
{
@ -63,6 +74,8 @@ public class ProtobufInputFormatTest
@Before
public void setUp() throws Exception
{
NullHandling.initializeForTests();
ExpressionProcessing.initializeForTests(null);
timestampSpec = new TimestampSpec("timestamp", "iso", null);
dimensionsSpec = new DimensionsSpec(Lists.newArrayList(
new StringDimensionSchema("event"),
@ -125,7 +138,7 @@ public class ProtobufInputFormatTest
}
@Test
public void testParseNestedData() throws Exception
public void testParseFlattenData() throws Exception
{
//configure parser with desc file
ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, decoder);
@ -136,11 +149,225 @@ public class ProtobufInputFormatTest
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
InputRow row = protobufInputFormat.createReader(
new InputRowSchema(timestampSpec, dimensionsSpec, null),
entity,
null
).read().next();
Assert.assertEquals(
ImmutableList.builder()
.add("event")
.add("id")
.add("someOtherId")
.add("isValid")
.build(),
row.getDimensions()
);
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
@Test
public void testParseFlattenDataJq() throws Exception
{
//configure parser with desc file
ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
new JSONPathSpec(
true,
Lists.newArrayList(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "eventType", "eventType"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "foobar", ".foo.bar"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "bar0", ".bar[0].bar")
)
),
decoder
);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime);
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputRow row = protobufInputFormat.createReader(
new InputRowSchema(timestampSpec, dimensionsSpec, null),
entity,
null
).read().next();
Assert.assertEquals(
ImmutableList.builder()
.add("event")
.add("id")
.add("someOtherId")
.add("isValid")
.build(),
row.getDimensions()
);
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
@Test
public void testParseFlattenDataDiscover() throws Exception
{
//configure parser with desc file
ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(flattenSpec, decoder);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime);
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputRow row = protobufInputFormat.createReader(
new InputRowSchema(timestampSpec, new DimensionsSpec(Collections.emptyList()), null),
entity,
null
).read().next();
Assert.assertEquals(
ImmutableList.builder()
.add("eventType")
.add("foobar")
.add("bar0")
.add("someOtherId")
.add("someIntColumn")
.add("isValid")
.add("description")
.add("someLongColumn")
.add("someFloatColumn")
.add("id")
.add("timestamp")
.build(),
row.getDimensions()
);
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
@Test
public void testParseNestedData() throws Exception
{
ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
JSONPathSpec.DEFAULT,
decoder
);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime);
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputEntityReader reader = protobufInputFormat.createReader(
new InputRowSchema(
timestampSpec,
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid"),
new StringDimensionSchema("eventType"),
new NestedDataDimensionSchema("foo"),
new NestedDataDimensionSchema("bar")
)
),
null
),
entity,
null
);
TransformSpec transformSpec = new TransformSpec(
null,
Lists.newArrayList(
new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')", TestExprMacroTable.INSTANCE),
new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')", TestExprMacroTable.INSTANCE)
)
);
TransformingInputEntityReader transformingReader = new TransformingInputEntityReader(
reader,
transformSpec.toTransformer()
);
InputRow row = transformingReader.read().next();
Assert.assertEquals(
ImmutableList.builder()
.add("event")
.add("id")
.add("someOtherId")
.add("isValid")
.add("eventType")
.add("foo")
.add("bar")
.build(),
row.getDimensions()
);
Assert.assertEquals(ImmutableMap.of("bar", "baz"), row.getRaw("foo"));
Assert.assertEquals(
ImmutableList.of(ImmutableMap.of("bar", "bar0"), ImmutableMap.of("bar", "bar1")),
row.getRaw("bar")
);
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
@Test
public void testParseNestedDataTransformsOnly() throws Exception
{
ProtobufInputFormat protobufInputFormat = new ProtobufInputFormat(
JSONPathSpec.DEFAULT,
decoder
);
//create binary of proto test event
DateTime dateTime = new DateTime(2012, 7, 12, 9, 30, ISOChronology.getInstanceUTC());
ProtoTestEventWrapper.ProtoTestEvent event = ProtobufInputRowParserTest.buildNestedData(dateTime);
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputEntityReader reader = protobufInputFormat.createReader(
new InputRowSchema(
timestampSpec,
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("event"),
new StringDimensionSchema("id"),
new StringDimensionSchema("someOtherId"),
new StringDimensionSchema("isValid"),
new StringDimensionSchema("eventType")
)
),
null
),
entity,
null
);
TransformSpec transformSpec = new TransformSpec(
null,
Lists.newArrayList(
new ExpressionTransform("foobar", "JSON_VALUE(foo, '$.bar')", TestExprMacroTable.INSTANCE),
new ExpressionTransform("bar0", "JSON_VALUE(bar, '$[0].bar')", TestExprMacroTable.INSTANCE)
)
);
TransformingInputEntityReader transformingReader = new TransformingInputEntityReader(
reader,
transformSpec.toTransformer()
);
InputRow row = transformingReader.read().next();
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}
@Test
public void testParseFlatData() throws Exception
{
@ -153,7 +380,11 @@ public class ProtobufInputFormatTest
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
InputRow row = protobufInputFormat.createReader(
new InputRowSchema(timestampSpec, dimensionsSpec, null),
entity,
null
).read().next();
ProtobufInputRowParserTest.verifyFlatData(row, dateTime);
}
@ -170,7 +401,11 @@ public class ProtobufInputFormatTest
final ByteEntity entity = new ByteEntity(ProtobufInputRowParserTest.toByteBuffer(event));
InputRow row = protobufInputFormat.createReader(new InputRowSchema(timestampSpec, dimensionsSpec, null), entity, null).read().next();
InputRow row = protobufInputFormat.createReader(
new InputRowSchema(timestampSpec, dimensionsSpec, null),
entity,
null
).read().next();
ProtobufInputRowParserTest.verifyNestedData(row, dateTime);
}

View File

@ -28,21 +28,15 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.nio.ByteBuffer;
public class ProtobufReaderTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private InputRowSchema inputRowSchema;
private InputRowSchema inputRowSchemaWithComplexTimestamp;
private JSONPathSpec flattenSpec;
@ -129,8 +123,6 @@ public class ProtobufReaderTest
@Test
public void testParseFlatDataWithComplexTimestampWithDefaultFlattenSpec() throws Exception
{
expectedException.expect(ParseException.class);
expectedException.expectMessage("is unparseable!");
ProtobufReader reader = new ProtobufReader(
inputRowSchemaWithComplexTimestamp,
null,