Fix Parquet Reader when ingestion need to read columns in filter (#16874)

This commit is contained in:
Maytas Monsereenusorn 2024-08-15 02:31:38 +07:00 committed by GitHub
parent 204533cade
commit c2ddff399d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 38 additions and 1 deletions

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType; import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.transform.Transform; import org.apache.druid.segment.transform.Transform;
import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.transform.TransformSpec;
@ -130,12 +131,18 @@ public class ReaderUtils
} }
} }
// Determine any fields we need to read from input file that is used in the transformSpec // Determine any fields we need to read from input file that is used in the transform of the transformSpec
List<Transform> transforms = transformSpec.getTransforms(); List<Transform> transforms = transformSpec.getTransforms();
for (Transform transform : transforms) { for (Transform transform : transforms) {
fieldsRequired.addAll(transform.getRequiredColumns()); fieldsRequired.addAll(transform.getRequiredColumns());
} }
// Determine any fields we need to read from input file that is used in the filter of the transformSpec
DimFilter filter = transformSpec.getFilter();
if (filter != null) {
fieldsRequired.addAll(filter.getRequiredColumns());
}
// Determine any fields we need to read from input file that is used in the dimensionsSpec // Determine any fields we need to read from input file that is used in the dimensionsSpec
List<DimensionSchema> dimensionSchema = dimensionsSpec.getDimensions(); List<DimensionSchema> dimensionSchema = dimensionsSpec.getDimensions();
for (DimensionSchema dim : dimensionSchema) { for (DimensionSchema dim : dimensionSchema) {

View File

@ -34,6 +34,8 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.FloatMinAggregatorFactory; import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.AndDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.testing.InitializedNullHandlingTest;
@ -364,4 +366,32 @@ public class ReaderUtilsTest extends InitializedNullHandlingTest
Set<String> actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec); Set<String> actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec);
Assert.assertEquals(ImmutableSet.of("A", "C"), actual); Assert.assertEquals(ImmutableSet.of("A", "C"), actual);
} }
@Test
public void testGetColumnsRequiredForIngestionWithFilterInTransformSpec()
{
TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null);
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("B"),
new StringDimensionSchema("C"),
new LongDimensionSchema("D"),
new FloatDimensionSchema("E"),
new LongDimensionSchema("F")
)
);
TransformSpec transformSpec = new TransformSpec(
new AndDimFilter(
ImmutableList.of(
new SelectorDimFilter("G", "foo", null),
new SelectorDimFilter("H", "foobar", null)
)
),
ImmutableList.of()
);
Set<String> actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, transformSpec, new AggregatorFactory[]{}, null);
Assert.assertEquals(ImmutableSet.of("A", "B", "C", "D", "E", "F", "G", "H"), actual);
}
} }