diff --git a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java index b47d676081f..5298f2605d8 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/ReaderUtils.java @@ -60,6 +60,11 @@ public class ReaderUtils // Find columns we need to read from the flattenSpec if (flattenSpec != null) { + if (dimensionsSpec.getDimensions().isEmpty() && flattenSpec.isUseFieldDiscovery()) { + // Schemaless ingestion with useFieldDiscovery needs to read all columns + return fullInputSchema; + } + // Parse columns needed from flattenSpec for (JSONPathFieldSpec fields : flattenSpec.getFields()) { if (fields.getType() == JSONPathFieldType.ROOT) { @@ -117,21 +122,27 @@ public class ReaderUtils fieldsRequired.retainAll(fullInputSchema); return fieldsRequired; } + } else { + // Without flattenSpec, useFieldDiscovery is default to true and thus needs to read all columns since this is + // schemaless + if (dimensionsSpec.getDimensions().isEmpty()) { + return fullInputSchema; + } } - // Determine any fields we need to read from parquet file that is used in the transformSpec + // Determine any fields we need to read from input file that is used in the transformSpec List transforms = transformSpec.getTransforms(); for (Transform transform : transforms) { fieldsRequired.addAll(transform.getRequiredColumns()); } - // Determine any fields we need to read from parquet file that is used in the dimensionsSpec + // Determine any fields we need to read from input file that is used in the dimensionsSpec List dimensionSchema = dimensionsSpec.getDimensions(); for (DimensionSchema dim : dimensionSchema) { fieldsRequired.add(dim.getName()); } - // Determine any fields we need to read from parquet file that is used in the metricsSpec + // Determine any fields we need to read from input file that is used in the metricsSpec for (AggregatorFactory agg : aggregators) { fieldsRequired.addAll(agg.requiredFields()); } diff --git a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java index 6ffe7bded61..6fddd3b4d65 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/ReaderUtilsTest.java @@ -328,4 +328,40 @@ public class ReaderUtilsTest extends InitializedNullHandlingTest Set actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec); Assert.assertEquals(ImmutableSet.of("B", "C"), actual); } + + @Test + public void testGetColumnsRequiredForSchemalessIngestionWithoutFlattenSpec() + { + TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null); + DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY; + + Set actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, null); + Assert.assertEquals(fullInputSchema, actual); + } + + @Test + public void testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndUseFieldDiscovery() + { + TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null); + DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY; + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr); + Set actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec); + Assert.assertEquals(fullInputSchema, actual); + } + + @Test + public void testGetColumnsRequiredForSchemalessIngestionWithFlattenSpecAndNotUseFieldDiscovery() + { + TimestampSpec timestampSpec = new TimestampSpec("A", "iso", null); + DimensionsSpec dimensionsSpec = DimensionsSpec.EMPTY; + List flattenExpr = ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.PATH, "CFlat", "$.C.time") + ); + JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr); + Set actual = ReaderUtils.getColumnsRequiredForIngestion(fullInputSchema, timestampSpec, dimensionsSpec, TransformSpec.NONE, new AggregatorFactory[]{}, flattenSpec); + Assert.assertEquals(ImmutableSet.of("A", "C"), actual); + } }