Fix Parquet Reader for schema-less ingestion need to read all columns (#13689)

* fix stuff

* address comments
This commit is contained in:
Maytas Monsereenusorn 2023-01-18 10:52:12 -10:00 committed by GitHub
parent fa493f1ebc
commit 1582d74f37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 3 deletions

View File

@ -60,6 +60,11 @@ public class ReaderUtils
// Find columns we need to read from the flattenSpec // Find columns we need to read from the flattenSpec
if (flattenSpec != null) { if (flattenSpec != null) {
if (dimensionsSpec.getDimensions().isEmpty() && flattenSpec.isUseFieldDiscovery()) {
// Schemaless ingestion with useFieldDiscovery needs to read all columns
return fullInputSchema;
}
// Parse columns needed from flattenSpec // Parse columns needed from flattenSpec
for (JSONPathFieldSpec fields : flattenSpec.getFields()) { for (JSONPathFieldSpec fields : flattenSpec.getFields()) {
if (fields.getType() == JSONPathFieldType.ROOT) { if (fields.getType() == JSONPathFieldType.ROOT) {
@ -117,21 +122,27 @@ public class ReaderUtils
fieldsRequired.retainAll(fullInputSchema); fieldsRequired.retainAll(fullInputSchema);
return fieldsRequired; return fieldsRequired;
} }
} else {
// Without flattenSpec, useFieldDiscovery is default to true and thus needs to read all columns since this is
// schemaless
if (dimensionsSpec.getDimensions().isEmpty()) {
return fullInputSchema;
}
} }
// Determine any fields we need to read from parquet file that is used in the transformSpec // Determine any fields we need to read from input file that is used in the transformSpec
List<Transform> transforms = transformSpec.getTransforms(); List<Transform> transforms = transformSpec.getTransforms();
for (Transform transform : transforms) { for (Transform transform : transforms) {
fieldsRequired.addAll(transform.getRequiredColumns()); fieldsRequired.addAll(transform.getRequiredColumns());
} }
// Determine any fields we need to read from parquet file that is used in the dimensionsSpec // Determine any fields we need to read from input file that is used in the dimensionsSpec
List<DimensionSchema> dimensionSchema = dimensionsSpec.getDimensions(); List<DimensionSchema> dimensionSchema = dimensionsSpec.getDimensions();
for (DimensionSchema dim : dimensionSchema) { for (DimensionSchema dim : dimensionSchema) {
fieldsRequired.add(dim.getName()); fieldsRequired.add(dim.getName());
} }
// Determine any fields we need to read from parquet file that is used in the metricsSpec // Determine any fields we need to read from input file that is used in the metricsSpec
for (AggregatorFactory agg : aggregators) { for (AggregatorFactory agg : aggregators) {
fieldsRequired.addAll(agg.requiredFields()); fieldsRequired.addAll(agg.requiredFields());
} }

View File

@ -328,4 +328,40 @@ public class ReaderUtilsTest extends InitializedNullHandlingTest
Set<String> actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec); Set<String> actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
Assert.assertEquals(ImmutableSet.of("B", "C"), actual); Assert.assertEquals(ImmutableSet.of("B", "C"), actual);
} }
@Test
public void testGetColumnsRequiredForSchemalessIngestionWithoutFlattenSpec()
{
TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
Set<String> actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, null);
Assert.assertEquals(fullInputSchema, actual);
}
@Test
public void testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndUseFieldDiscovery()
{
TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
Set<String> actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
Assert.assertEquals(fullInputSchema, actual);
}
@Test
public void testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndNotUseFieldDiscovery()
{
TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY;
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time")
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
Set<String> actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
Assert.assertEquals(ImmutableSet.of("A", "C"), actual);
}
} }