add flag to flattenSpec to keep null columns (#9814)

* add flag to flattenSpec to keep null columns

* remove changes to inputFormat interface

* add comment

* change comment message

* update web console e2e test

* move keepNullColmns to JSONParseSpec

* fix merge conflicts

* fix tests

* set keepNullColumns to false by default

* fix lgtm

* change Boolean to boolean, add keepNullColumns to hash, add tests for keepKeepNullColumns false + true with no nuulul columns

* Add equals verifier tests
This commit is contained in:
mcbrewster 2020-05-08 22:53:39 -06:00 committed by GitHub
parent 339876b69d
commit 28be107a1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 305 additions and 63 deletions

View File

@ -57,6 +57,7 @@ public class FlattenJSONBenchmarkUtil
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(null, null, null),
null,
null,
null
);
return spec.makeParser();
@ -71,6 +72,7 @@ public class FlattenJSONBenchmarkUtil
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(null, null, null),
flattenSpec,
null,
null
);
@ -114,6 +116,7 @@ public class FlattenJSONBenchmarkUtil
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(null, null, null),
flattenSpec,
null,
null
);
@ -157,6 +160,7 @@ public class FlattenJSONBenchmarkUtil
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(null, null, null),
flattenSpec,
null,
null
);
@ -198,6 +202,7 @@ public class FlattenJSONBenchmarkUtil
new TimestampSpec("ts", "iso", null),
new DimensionsSpec(null, null, null),
flattenSpec,
null,
null
);

View File

@ -37,17 +37,20 @@ public class JSONParseSpec extends NestedDataParseSpec<JSONPathSpec>
{
private final ObjectMapper objectMapper;
private final Map<String, Boolean> featureSpec;
private final boolean keepNullColumns;
@JsonCreator
public JSONParseSpec(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("flattenSpec") JSONPathSpec flattenSpec,
@JsonProperty("featureSpec") Map<String, Boolean> featureSpec
@JsonProperty("featureSpec") Map<String, Boolean> featureSpec,
@JsonProperty("keepNullColumns") Boolean keepNullColumns
)
{
super(timestampSpec, dimensionsSpec, flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT);
this.objectMapper = new ObjectMapper();
this.keepNullColumns = (keepNullColumns == null) ? false : keepNullColumns;
this.featureSpec = (featureSpec == null) ? new HashMap<>() : featureSpec;
for (Map.Entry<String, Boolean> entry : this.featureSpec.entrySet()) {
Feature feature = Feature.valueOf(entry.getKey());
@ -55,28 +58,27 @@ public class JSONParseSpec extends NestedDataParseSpec<JSONPathSpec>
}
}
@Deprecated
public JSONParseSpec(TimestampSpec ts, DimensionsSpec dims)
{
this(ts, dims, null, null);
this(ts, dims, null, null, false);
}
@Override
public Parser<String, Object> makeParser()
{
return new JSONPathParser(getFlattenSpec(), objectMapper);
return new JSONPathParser(getFlattenSpec(), objectMapper, getKeepNullColumns());
}
@Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new JSONParseSpec(spec, getDimensionsSpec(), getFlattenSpec(), getFeatureSpec());
return new JSONParseSpec(spec, getDimensionsSpec(), getFlattenSpec(), getFeatureSpec(), getKeepNullColumns());
}
@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new JSONParseSpec(getTimestampSpec(), spec, getFlattenSpec(), getFeatureSpec());
return new JSONParseSpec(getTimestampSpec(), spec, getFlattenSpec(), getFeatureSpec(), getKeepNullColumns());
}
@JsonProperty
@ -85,6 +87,12 @@ public class JSONParseSpec extends NestedDataParseSpec<JSONPathSpec>
return featureSpec;
}
@JsonProperty
public boolean getKeepNullColumns()
{
return keepNullColumns;
}
@Override
public boolean equals(final Object o)
{
@ -98,13 +106,13 @@ public class JSONParseSpec extends NestedDataParseSpec<JSONPathSpec>
return false;
}
final JSONParseSpec that = (JSONParseSpec) o;
return Objects.equals(featureSpec, that.featureSpec);
return Objects.equals(featureSpec, that.featureSpec) && Objects.equals(keepNullColumns, that.keepNullColumns);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), featureSpec);
return Objects.hash(super.hashCode(), featureSpec, getKeepNullColumns());
}
@Override
@ -115,6 +123,7 @@ public class JSONParseSpec extends NestedDataParseSpec<JSONPathSpec>
", dimensionsSpec=" + getDimensionsSpec() +
", flattenSpec=" + getFlattenSpec() +
", featureSpec=" + featureSpec +
", keepNullColumns=" + keepNullColumns +
'}';
}
}

View File

@ -39,16 +39,19 @@ public class JsonInputFormat extends NestedInputFormat
{
private final Map<String, Boolean> featureSpec;
private final ObjectMapper objectMapper;
private final boolean keepNullColumns;
@JsonCreator
public JsonInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("featureSpec") @Nullable Map<String, Boolean> featureSpec
@JsonProperty("featureSpec") @Nullable Map<String, Boolean> featureSpec,
@JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns
)
{
super(flattenSpec);
this.featureSpec = featureSpec == null ? Collections.emptyMap() : featureSpec;
this.objectMapper = new ObjectMapper();
this.keepNullColumns = keepNullColumns == null ? false : keepNullColumns;
for (Entry<String, Boolean> entry : this.featureSpec.entrySet()) {
Feature feature = Feature.valueOf(entry.getKey());
objectMapper.configure(feature, entry.getValue());
@ -70,7 +73,7 @@ public class JsonInputFormat extends NestedInputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper);
return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
}
@Override
@ -86,12 +89,12 @@ public class JsonInputFormat extends NestedInputFormat
return false;
}
JsonInputFormat that = (JsonInputFormat) o;
return Objects.equals(featureSpec, that.featureSpec);
return Objects.equals(featureSpec, that.featureSpec) && Objects.equals(keepNullColumns, that.keepNullColumns);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), featureSpec);
return Objects.hash(super.hashCode(), featureSpec, keepNullColumns);
}
}

View File

@ -45,11 +45,12 @@ public class JsonReader extends TextReader
InputRowSchema inputRowSchema,
InputEntity source,
JSONPathSpec flattenSpec,
ObjectMapper mapper
ObjectMapper mapper,
boolean keepNullColumns
)
{
super(inputRowSchema, source);
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker());
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.mapper = mapper;
}

View File

@ -53,8 +53,15 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
private final boolean keepNullValues;
private final CharsetEncoder enc = StandardCharsets.UTF_8.newEncoder();
public JSONFlattenerMaker(boolean keepNullValues)
{
this.keepNullValues = keepNullValues;
}
@Override
public Iterable<String> discoverRootFields(final JsonNode obj)
{
@ -62,7 +69,8 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
.filter(
entry -> {
final JsonNode val = entry.getValue();
return !(val.isObject() || val.isNull() || (val.isArray() && !isFlatList(val)));
// If the keepNullValues is set on the JSONParseSpec then null values should not be filtered out
return !(val.isObject() || (!keepNullValues && val.isNull()) || (val.isArray() && !isFlatList(val)));
}
)
.transform(Map.Entry::getKey);

View File

@ -32,17 +32,17 @@ public class JSONPathParser implements Parser<String, Object>
{
private final ObjectMapper mapper;
private final ObjectFlattener<JsonNode> flattener;
/**
* Constructor
*
* @param flattenSpec Provide a path spec for flattening and field discovery.
* @param mapper Optionally provide an ObjectMapper, used by the parser for reading the input JSON.
*/
public JSONPathParser(JSONPathSpec flattenSpec, ObjectMapper mapper)
public JSONPathParser(JSONPathSpec flattenSpec, ObjectMapper mapper, boolean keepNullColumns)
{
this.mapper = mapper == null ? new ObjectMapper() : mapper;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker());
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
}
@Override

View File

@ -52,6 +52,7 @@ public class InputRowParserSerdeTest
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo", "bar")), null, null),
null,
null,
null
),
null
@ -98,6 +99,7 @@ public class InputRowParserSerdeTest
null
),
null,
null,
null
)
);
@ -131,6 +133,7 @@ public class InputRowParserSerdeTest
null
),
null,
null,
null
)
);
@ -168,6 +171,7 @@ public class InputRowParserSerdeTest
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo", "bar")), null, null),
null,
null,
null
),
charset.name()
@ -208,6 +212,7 @@ public class InputRowParserSerdeTest
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(null, null, null),
flattenSpec,
null,
null
),
null

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.TestObjectMapper;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
@ -57,7 +58,8 @@ public class JSONParseSpecTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null
null,
false
);
final Map<String, Object> expected = new HashMap<>();
@ -95,7 +97,8 @@ public class JSONParseSpecTest
new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar", "$.[?(@.something_else)].something_else.foo")
)
),
null
null,
false
);
final Map<String, Object> expected = new HashMap<>();
@ -119,7 +122,8 @@ public class JSONParseSpecTest
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null),
null,
feature
feature,
false
);
final JSONParseSpec serde = (JSONParseSpec) jsonMapper.readValue(
@ -132,4 +136,23 @@ public class JSONParseSpecTest
Assert.assertEquals(Arrays.asList("bar", "foo"), serde.getDimensionsSpec().getDimensionNames());
Assert.assertEquals(feature, serde.getFeatureSpec());
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(JSONParseSpec.class)
.usingGetClass()
.withPrefabValues(
DimensionsSpec.class,
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("baz", "buzz")), null, null)
)
.withPrefabValues(
ObjectMapper.class,
new ObjectMapper(),
new ObjectMapper()
)
.withIgnoredFields("objectMapper")
.verify();
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
@ -50,10 +51,25 @@ public class JsonInputFormatTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false)
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
false
);
final byte[] bytes = mapper.writeValueAsBytes(format);
final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes, InputFormat.class);
Assert.assertEquals(format, fromJson);
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(JsonInputFormat.class)
.usingGetClass()
.withPrefabValues(
ObjectMapper.class,
new ObjectMapper(),
new ObjectMapper()
)
.withIgnoredFields("objectMapper")
.verify();
}
}

View File

@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
public class JsonReaderTest
@ -53,6 +54,7 @@ public class JsonReaderTest
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null
);
@ -102,6 +104,7 @@ public class JsonReaderTest
new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar", "$.[?(@.something_else)].something_else.foo")
)
),
null,
null
);
@ -132,4 +135,130 @@ public class JsonReaderTest
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testParseRowKeepNullColumns() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg")
)
),
null,
true
);
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"o\":{\"mg\":null}}")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
Collections.emptyList()
),
source,
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "bar", "foo"), row.getDimensions());
Assert.assertTrue(row.getDimension("bar").isEmpty());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertTrue(row.getDimension("path_omg").isEmpty());
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testKeepNullColumnsWithNoNullValues() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg")
)
),
null,
true
);
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":1,\"foo\":\"x\",\"o\":{\"mg\":\"a\"}}")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
Collections.emptyList()
),
source,
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "bar", "foo"), row.getDimensions());
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("bar")));
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("a", Iterables.getOnlyElement(row.getDimension("path_omg")));
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
@Test
public void testFalseKeepNullColumns() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg")
)
),
null,
false
);
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"o\":{\"mg\":\"a\"}}")
);
final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
Collections.emptyList()
),
source,
null
);
final int numExpectedIterations = 1;
try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
final InputRow row = iterator.next();
Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "foo"), row.getDimensions());
Assert.assertTrue(row.getDimension("bar").isEmpty());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("a", Iterables.getOnlyElement(row.getDimension("path_omg")));
numActualIterations++;
}
Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}
}

View File

@ -56,7 +56,7 @@ public class JSONPathParserTest
public void testSimple()
{
List<JSONPathFieldSpec> fields = new ArrayList<>();
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false);
final Map<String, Object> jsonMap = jsonParser.parseToMap(JSON);
Assert.assertEquals(
"jsonMap",
@ -69,7 +69,7 @@ public class JSONPathParserTest
public void testWithNumbers()
{
List<JSONPathFieldSpec> fields = new ArrayList<>();
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false);
final Map<String, Object> jsonMap = jsonParser.parseToMap(NUMBERS_JSON);
Assert.assertEquals(
"jsonMap",
@ -82,7 +82,7 @@ public class JSONPathParserTest
public void testWithWhackyCharacters()
{
List<JSONPathFieldSpec> fields = new ArrayList<>();
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false);
final Map<String, Object> jsonMap = jsonParser.parseToMap(WHACKY_CHARACTER_JSON);
Assert.assertEquals(
"jsonMap",
@ -112,7 +112,7 @@ public class JSONPathParserTest
fields.add(new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq-met-array", ".met.a"));
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false);
final Map<String, Object> jsonMap = jsonParser.parseToMap(NESTED_JSON);
// Root fields
@ -173,7 +173,7 @@ public class JSONPathParserTest
fields.add(new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq-heybarx0", ".hey[0].barx"));
fields.add(new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq-met-array", ".met.a"));
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null, false);
final Map<String, Object> jsonMap = jsonParser.parseToMap(NESTED_JSON);
// Root fields
@ -210,7 +210,7 @@ public class JSONPathParserTest
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Cannot have duplicate field definition: met-array");
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null, false);
jsonParser.parseToMap(NESTED_JSON);
}
@ -224,7 +224,7 @@ public class JSONPathParserTest
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Cannot have duplicate field definition: met-array");
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(false, fields), null, false);
jsonParser.parseToMap(NESTED_JSON);
}
@ -236,7 +236,7 @@ public class JSONPathParserTest
thrown.expect(ParseException.class);
thrown.expectMessage("Unable to parse row [" + NOT_JSON + "]");
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null);
final Parser<String, Object> jsonParser = new JSONPathParser(new JSONPathSpec(true, fields), null, false);
jsonParser.parseToMap(NOT_JSON);
}
}

View File

@ -68,7 +68,9 @@ public class ThriftInputRowParserTest
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "title", "title"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "lastName", "$.author.lastName")
)
), null
),
null,
null
);
}

View File

@ -148,7 +148,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
@ -169,7 +169,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(1L) // set maxSplitSize to 1 so that each inputSplit has only one object
);
@ -191,7 +191,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(CONTENT.length * 3L)
);

View File

@ -126,7 +126,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
new KafkaSupervisorIOConfig(
TOPIC,
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null,
null,
null,

View File

@ -132,7 +132,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of()
ImmutableMap.of(),
false
);
private static final String TOPIC_PREFIX = "testTopic";
private static final String DATASOURCE = "testDS";

View File

@ -135,7 +135,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
null,
new KinesisSupervisorIOConfig(
STREAM,
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
null,
null,
null,

View File

@ -114,7 +114,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of()
ImmutableMap.of(),
false
);
private static final String DATASOURCE = "testDS";
private static final int TEST_CHAT_THREADS = 3;

View File

@ -540,7 +540,8 @@ public class UriExtractionNamespace implements ExtractionNamespace
new JSONPathFieldSpec(JSONPathFieldType.ROOT, valueFieldName, valueFieldName)
)
),
jsonMapper.copy()
jsonMapper.copy(),
false
),
keyFieldName,
valueFieldName

View File

@ -71,7 +71,9 @@ public class ProtobufInputRowParserTest
new JSONPathFieldSpec(JSONPathFieldType.PATH, "foobar", "$.foo.bar"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "bar0", "$.bar[0].bar")
)
), null
),
null,
null
);
}

View File

@ -363,7 +363,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
@ -389,7 +389,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(1L) // set maxSplitSize to 1 so that each inputSplit has only one object
);
@ -416,7 +416,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(CONTENT.length * 3L)
);
@ -446,7 +446,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(
@ -480,7 +480,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
);
inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
).collect(Collectors.toList());
}

View File

@ -70,7 +70,8 @@ public class HadoopDruidIndexerMapperTest
null
),
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of()
ImmutableMap.of(),
null
)
),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT

View File

@ -336,6 +336,7 @@ public class IndexGeneratorJobTest
new TimestampSpec("ts", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null),
null,
null,
null
),
null
@ -376,6 +377,7 @@ public class IndexGeneratorJobTest
"Y"
)), null, null),
null,
null,
null
),
null

View File

@ -80,7 +80,8 @@ public class JobHelperTest
null
),
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of()
ImmutableMap.of(),
null
)
),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT

View File

@ -1156,7 +1156,7 @@ public class IndexTaskTest extends IngestionTestBase
tmpDir,
timestampSpec,
dimensionsSpec,
new JsonInputFormat(null, null),
new JsonInputFormat(null, null, null),
null,
null,
tuningConfig,
@ -1166,7 +1166,7 @@ public class IndexTaskTest extends IngestionTestBase
ingestionSpec = createIngestionSpec(
jsonMapper,
tmpDir,
new JSONParseSpec(timestampSpec, dimensionsSpec, null, null),
new JSONParseSpec(timestampSpec, dimensionsSpec, null, null, null),
null,
null,
tuningConfig,

View File

@ -273,6 +273,6 @@ class ParallelIndexTestingFactory
static InputFormat getInputFormat()
{
return new JsonInputFormat(null, null);
return new JsonInputFormat(null, null, null);
}
}

View File

@ -43,7 +43,7 @@ public class PartialHashSegmentGenerateTaskTest
private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
private static final ParallelIndexIngestionSpec INGESTION_SPEC = ParallelIndexTestingFactory.createIngestionSpec(
new LocalInputSource(new File("baseDir"), "filer"),
new JsonInputFormat(null, null),
new JsonInputFormat(null, null, null),
new ParallelIndexTestingFactory.TuningConfigBuilder().build(),
ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS)
);

View File

@ -54,7 +54,7 @@ public class SinglePhaseSubTaskSpecTest
new ParallelIndexIOConfig(
null,
new LocalInputSource(new File("baseDir"), "filter"),
new JsonInputFormat(null, null),
new JsonInputFormat(null, null, null),
null
),
null

View File

@ -240,6 +240,7 @@ public class IngestSegmentFirehoseFactoryTest
ImmutableList.of()
),
null,
null,
null
)
)

View File

@ -95,6 +95,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
null
),
null,
null,
null
)
)

View File

@ -1088,7 +1088,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
{
switch (parserType) {
case STR_JSON:
return new JsonInputFormat(null, null);
return new JsonInputFormat(null, null, null);
case STR_CSV:
return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, null, false, 0);
default:
@ -1100,7 +1100,7 @@ public class InputSourceSamplerTest extends InitializedNullHandlingTest
{
switch (parserType) {
case STR_JSON:
return new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec, null, null));
return new StringInputRowParser(new JSONParseSpec(timestampSpec, dimensionsSpec, null, null, null));
case STR_CSV:
return new StringInputRowParser(
new DelimitedParseSpec(

View File

@ -126,7 +126,8 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
);
protected static final InputFormat INPUT_FORMAT = new JsonInputFormat(
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of()
ImmutableMap.of(),
null
);
protected static final Logger LOG = new Logger(SeekableStreamIndexTaskTestBase.class);
protected static ListeningExecutorService taskExec;
@ -165,7 +166,8 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport
null
),
new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of()
ImmutableMap.of(),
false
),
StandardCharsets.UTF_8.name()
),

View File

@ -66,7 +66,8 @@ public class StreamChunkParserTest
TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
JSONPathSpec.DEFAULT,
Collections.emptyMap()
Collections.emptyMap(),
false
),
StringUtils.UTF8_STRING
);
@ -84,7 +85,7 @@ public class StreamChunkParserTest
@Test
public void testWithNullParserAndInputformatParseProperly() throws IOException
{
final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap());
final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null);
final StreamChunkParser chunkParser = new StreamChunkParser(
null,
inputFormat,
@ -117,7 +118,8 @@ public class StreamChunkParserTest
TIMESTAMP_SPEC,
DimensionsSpec.EMPTY,
JSONPathSpec.DEFAULT,
Collections.emptyMap()
Collections.emptyMap(),
false
),
StringUtils.UTF8_STRING
);
@ -153,7 +155,7 @@ public class StreamChunkParserTest
private TrackingJsonInputFormat(@Nullable JSONPathSpec flattenSpec, @Nullable Map<String, Boolean> featureSpec)
{
super(flattenSpec, featureSpec);
super(flattenSpec, featureSpec, null);
}
@Override

View File

@ -757,7 +757,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
1,
new Period("PT1H"),
@ -811,7 +811,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()),
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
1,
new Period("PT1H"),

View File

@ -127,6 +127,7 @@ public class DoubleStorageTest
ImmutableList.of()
),
null,
null,
null
)
);

View File

@ -77,6 +77,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
null,
null,
null
),
null
@ -114,6 +115,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
null
),
null,
null,
null
),
null
@ -151,6 +153,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
null
),
null,
null,
null
),
null
@ -209,6 +212,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
"metric1"
)), ImmutableList.of("dimC"), null),
null,
null,
null
),
null
@ -242,6 +246,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
null
),
null,
null,
null
),
null
@ -307,6 +312,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
null
),
null,
null,
null
),
null
@ -401,6 +407,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
new TimestampSpec("xXx", null, null),
new DimensionsSpec(null, Arrays.asList("metric1", "xXx", "col1"), null),
null,
null,
null
)
);
@ -467,6 +474,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
null,
null,
null
),
null
@ -506,6 +514,7 @@ public class DataSchemaTest extends InitializedNullHandlingTest
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
null,
null,
null
),
null

View File

@ -90,6 +90,7 @@ public class FireDepartmentTest
null
),
null,
null,
null
),
null

View File

@ -130,6 +130,7 @@ public class AppenderatorTester implements AutoCloseable
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
)
),

View File

@ -115,6 +115,7 @@ public class DefaultOfflineAppenderatorFactoryTest
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
)
),

View File

@ -77,6 +77,7 @@ public class EventReceiverFirehoseIdleTest
null
), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null),
null,
null,
null
)
),

View File

@ -93,6 +93,7 @@ public class EventReceiverFirehoseTest
null
), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null),
null,
null,
null
)
),
@ -240,6 +241,7 @@ public class EventReceiverFirehoseTest
null
), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1")), null, null),
null,
null,
null
)
),

View File

@ -141,6 +141,7 @@ public class RealtimePlumberSchoolTest
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
),
null
@ -161,6 +162,7 @@ public class RealtimePlumberSchoolTest
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null),
null,
null,
null
),
null

View File

@ -181,6 +181,7 @@ async function validateQuery(page: playwright.Page, datasourceName: string) {
/* isNew */ 'false',
/* isRobot */ 'false',
/* isUnpatrolled */ 'false',
/* metroCode */ 'null',
/* namespace */ 'Talk',
/* page */ 'Talk:Oswald Tilghman',
/* regionIsoCode */ 'null',

View File

@ -230,6 +230,7 @@ export interface InputFormat {
pattern?: string;
function?: string;
flattenSpec?: FlattenSpec;
keepNullColumns?: boolean;
}
export type DimensionMode = 'specific' | 'auto-detect';

View File

@ -107,6 +107,9 @@ export function getCacheRowsFromSampleResponse(
export function applyCache(sampleSpec: SampleSpec, cacheRows: CacheRows) {
if (!cacheRows) return sampleSpec;
// In order to prevent potential data loss null columns should be kept by the sampler and shown in the ingestion flow
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.inputFormat.keepNullColumns', true);
// If this is already an inline spec there is nothing to do
if (deepGet(sampleSpec, 'spec.ioConfig.inputSource.type') === 'inline') return sampleSpec;
@ -120,7 +123,7 @@ export function applyCache(sampleSpec: SampleSpec, cacheRows: CacheRows) {
});
const flattenSpec = deepGet(sampleSpec, 'spec.ioConfig.inputFormat.flattenSpec');
const inputFormat: InputFormat = { type: 'json' };
const inputFormat: InputFormat = { type: 'json', keepNullColumns: true };
if (flattenSpec) inputFormat.flattenSpec = flattenSpec;
sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.inputFormat', inputFormat);
@ -198,6 +201,8 @@ function makeSamplerIoConfig(
} else if (specType === 'kinesis') {
ioConfig = deepSet(ioConfig, 'useEarliestSequenceNumber', sampleStrategy === 'start');
}
// In order to prevent potential data loss null columns should be kept by the sampler and shown in the ingestion flow
ioConfig = deepSet(ioConfig, 'inputFormat.keepNullColumns', true);
return ioConfig;
}

View File

@ -104,6 +104,7 @@ describe('test-utils', () => {
},
"ioConfig": Object {
"inputFormat": Object {
"keepNullColumns": true,
"type": "json",
},
"inputSource": Object {