add context flag "useAutoColumnSchemas" to use new auto types for MSQ segment generation (#14175)

This commit is contained in:
Clint Wylie 2023-05-10 15:37:14 -07:00 committed by GitHub
parent 161d12eb44
commit 625c4745b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 188 additions and 32 deletions

View File

@ -1903,12 +1903,19 @@ public class ControllerImpl implements Controller
aggregators,
outputColumnAggregatorFactories,
outputColumnName,
type
type,
query.context()
);
} else {
// complex columns only
if (DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName())) {
dimensions.add(DimensionSchemaUtils.createDimensionSchema(outputColumnName, type));
dimensions.add(
DimensionSchemaUtils.createDimensionSchema(
outputColumnName,
type,
MultiStageQueryContext.useAutoColumnSchemas(query.context())
)
);
} else if (!isRollupQuery) {
aggregators.add(new PassthroughAggregatorFactory(outputColumnName, type.getComplexTypeName()));
} else {
@ -1917,7 +1924,8 @@ public class ControllerImpl implements Controller
aggregators,
outputColumnAggregatorFactories,
outputColumnName,
type
type,
query.context()
);
}
}
@ -1943,13 +1951,20 @@ public class ControllerImpl implements Controller
List<AggregatorFactory> aggregators,
Map<String, AggregatorFactory> outputColumnAggregatorFactories,
String outputColumn,
ColumnType type
ColumnType type,
QueryContext context
)
{
if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
} else {
dimensions.add(DimensionSchemaUtils.createDimensionSchema(outputColumn, type));
dimensions.add(
DimensionSchemaUtils.createDimensionSchema(
outputColumn,
type,
MultiStageQueryContext.useAutoColumnSchemas(context)
)
);
}
}

View File

@ -126,7 +126,8 @@ public class ExternalInputSliceReader implements InputSliceReader
column ->
DimensionSchemaUtils.createDimensionSchema(
column,
signature.getColumnType(column).orElse(null)
signature.getColumnType(column).orElse(null),
false
)
).collect(Collectors.toList())
),

View File

@ -25,10 +25,12 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import javax.annotation.Nullable;
@ -38,8 +40,23 @@ import javax.annotation.Nullable;
*/
public class DimensionSchemaUtils
{
public static DimensionSchema createDimensionSchema(final String column, @Nullable final ColumnType type)
public static DimensionSchema createDimensionSchema(
final String column,
@Nullable final ColumnType type,
boolean useAutoType
)
{
if (useAutoType) {
// for complex types that are not COMPLEX<json>, we still want to use the handler since 'auto' typing
// only works for the 'standard' built-in typesg
if (type != null && type.is(ValueType.COMPLEX) && !ColumnType.NESTED_DATA.equals(type)) {
final ColumnCapabilities capabilities = ColumnCapabilitiesImpl.createDefault().setType(type);
return DimensionHandlerUtils.getHandlerFromCapabilities(column, capabilities, null)
.getDimensionSchema(capabilities);
}
return new AutoTypeColumnSchema(column);
} else {
// if schema information not available, create a string dimension
if (type == null) {
return new StringDimensionSchema(column);
@ -68,3 +85,4 @@ public class DimensionSchemaUtils
}
}
}
}

View File

@ -63,6 +63,11 @@ import java.util.stream.Collectors;
* <li><b>clusterStatisticsMergeMode</b>: Whether to use parallel or sequential mode for merging of the worker sketches.
* Can be <b>PARALLEL</b>, <b>SEQUENTIAL</b> or <b>AUTO</b>. See {@link ClusterStatisticsMergeMode} for more information on each mode.
* Default value is <b>PARALLEL</b></li>
*
* <li><b>useAutoColumnSchemas</b>: Temporary flag to allow experimentation using
* {@link org.apache.druid.segment.AutoTypeColumnSchema} for all 'standard' type columns during segment generation,
* see {@link DimensionSchemaUtils#createDimensionSchema} for more details.
*
* </ol>
**/
public class MultiStageQueryContext
@ -109,6 +114,8 @@ public class MultiStageQueryContext
public static final String CTX_INDEX_SPEC = "indexSpec";
public static final String CTX_USE_AUTO_SCHEMAS = "useAutoColumnSchemas";
private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
public static String getMSQMode(final QueryContext queryContext)
@ -213,6 +220,11 @@ public class MultiStageQueryContext
return decodeIndexSpec(queryContext.get(CTX_INDEX_SPEC), objectMapper);
}
public static boolean useAutoColumnSchemas(final QueryContext queryContext)
{
return queryContext.getBoolean(CTX_USE_AUTO_SCHEMAS, false);
}
/**
* Decodes {@link #CTX_SORT_ORDER} from either a JSON or CSV string.
*/

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@ -34,6 +35,7 @@ import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.NestedDataTestUtils;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
@ -41,6 +43,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.SqlPlanningException;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CompressionUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
@ -51,9 +54,13 @@ import org.mockito.Mockito;
import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -436,7 +443,7 @@ public class MSQInsertTest extends MSQTestBase
}
@Test
public void testInsertOnFoo1WithLimitWithoutClusterBy()
public void testInsertOnFoo1MultiValueDimWithLimitWithoutClusterBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -453,7 +460,7 @@ public class MSQInsertTest extends MSQTestBase
}
@Test
public void testInsertOnFoo1WithLimitWithClusterBy()
public void testInsertOnFoo1MultiValueDimWithLimitWithClusterBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@ -519,6 +526,40 @@ public class MSQInsertTest extends MSQTestBase
.verifyResults();
}
@Test
public void testInsertOnFoo1WithAutoTypeArrayGroupBy()
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim3", ColumnType.STRING_ARRAY).build();
final Map<String, Object> adjustedContext = new HashMap<>(context);
adjustedContext.put(MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS, true);
testIngestQuery().setSql(
"INSERT INTO foo1 SELECT MV_TO_ARRAY(dim3) as dim3 FROM foo GROUP BY 1 PARTITIONED BY ALL TIME")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(adjustedContext)
.setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
.setExpectedResultRows(
NullHandling.replaceWithDefault() ?
ImmutableList.of(
new Object[]{0L, new Object[]{null}},
new Object[]{0L, new Object[]{"a", "b"}},
new Object[]{0L, new Object[]{"b", "c"}},
new Object[]{0L, new Object[]{"d"}}
) : ImmutableList.of(
new Object[]{0L, new Object[]{null}},
new Object[]{0L, new Object[]{"a", "b"}},
new Object[]{0L, new Object[]{""}},
new Object[]{0L, new Object[]{"b", "c"}},
new Object[]{0L, new Object[]{"d"}}
)
)
.verifyResults();
}
@Test
public void testInsertOnFoo1WithMultiValueDimGroupByWithoutGroupByEnable()
{
@ -1024,6 +1065,64 @@ public class MSQInsertTest extends MSQTestBase
.verifyPlanningErrors();
}
@Test
public void testInsertArraysAutoType() throws IOException
{
List<Object[]> expectedRows = Arrays.asList(
new Object[]{1672531200000L, null, null, null},
new Object[]{1672531200000L, null, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}},
new Object[]{1672531200000L, new Object[]{"d", "e"}, new Object[]{1L, 4L}, new Object[]{2.2, 3.3, 4.0}},
new Object[]{1672531200000L, new Object[]{"a", "b"}, null, null},
new Object[]{1672531200000L, new Object[]{"a", "b"}, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}},
new Object[]{1672531200000L, new Object[]{"b", "c"}, new Object[]{1L, 2L, 3L, 4L}, new Object[]{1.1, 3.3}},
new Object[]{1672531200000L, new Object[]{"a", "b", "c"}, new Object[]{2L, 3L}, new Object[]{3.3, 4.4, 5.5}},
new Object[]{1672617600000L, null, null, null},
new Object[]{1672617600000L, null, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}},
new Object[]{1672617600000L, new Object[]{"d", "e"}, new Object[]{1L, 4L}, new Object[]{2.2, 3.3, 4.0}},
new Object[]{1672617600000L, new Object[]{"a", "b"}, null, null},
new Object[]{1672617600000L, new Object[]{"a", "b"}, new Object[]{1L, 2L, 3L}, new Object[]{1.1, 2.2, 3.3}},
new Object[]{1672617600000L, new Object[]{"b", "c"}, new Object[]{1L, 2L, 3L, 4L}, new Object[]{1.1, 3.3}},
new Object[]{1672617600000L, new Object[]{"a", "b", "c"}, new Object[]{2L, 3L}, new Object[]{3.3, 4.4, 5.5}}
);
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("arrayString", ColumnType.STRING_ARRAY)
.add("arrayLong", ColumnType.LONG_ARRAY)
.add("arrayDouble", ColumnType.DOUBLE_ARRAY)
.build();
final Map<String, Object> adjustedContext = new HashMap<>(context);
adjustedContext.put(MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS, true);
final File tmpFile = temporaryFolder.newFile();
final InputStream resourceStream = NestedDataTestUtils.class.getClassLoader().getResourceAsStream(NestedDataTestUtils.ARRAY_TYPES_DATA_FILE);
final InputStream decompressing = CompressionUtils.decompress(resourceStream, NestedDataTestUtils.ARRAY_TYPES_DATA_FILE);
Files.copy(decompressing, tmpFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
decompressing.close();
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(tmpFile);
testIngestQuery().setSql(" INSERT INTO foo1 SELECT\n"
+ " TIME_PARSE(\"timestamp\") as __time,\n"
+ " arrayString,\n"
+ " arrayLong,\n"
+ " arrayDouble\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + toReadFileNameAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"STRING\"}, {\"name\": \"arrayString\", \"type\": \"COMPLEX<json>\"}, {\"name\": \"arrayLong\", \"type\": \"COMPLEX<json>\"}, {\"name\": \"arrayDouble\", \"type\": \"COMPLEX<json>\"}]'\n"
+ " )\n"
+ ") PARTITIONED BY day")
.setQueryContext(adjustedContext)
.setExpectedResultRows(expectedRows)
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.verifyResults();
}
@Nonnull
private List<Object[]> expectedFooRows()
{

View File

@ -50,6 +50,7 @@ import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.guice.annotations.EscalatedGlobal;
@ -301,6 +302,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
{
// We want this module to bring InputSourceModule along for the ride.
binder.install(new InputSourceModule());
binder.install(new NestedDataModule());
NestedDataModule.registerHandlersAndSerde();
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
}
@ -1129,7 +1132,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
log.info(
"Found rows which are sorted forcefully %s",
transformedOutputRows.stream().map(a -> Arrays.toString(a)).collect(Collectors.joining("\n"))
transformedOutputRows.stream().map(Arrays::deepToString).collect(Collectors.joining("\n"))
);

View File

@ -48,6 +48,7 @@ import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_ROWS_IN_MEMOR
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_ROWS_PER_SEGMENT;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_SORT_ORDER;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_USE_AUTO_SCHEMAS;
import static org.apache.druid.msq.util.MultiStageQueryContext.DEFAULT_MAX_NUM_TASKS;
public class MultiStageQueryContextTest
@ -262,6 +263,13 @@ public class MultiStageQueryContextTest
Assert.assertEquals("nonStrict", MultiStageQueryContext.getMSQMode(QueryContext.of(propertyMap)));
}
@Test
public void testUseAutoSchemas()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_USE_AUTO_SCHEMAS, true);
Assert.assertTrue(MultiStageQueryContext.useAutoColumnSchemas(QueryContext.of(propertyMap)));
}
private static List<String> decodeSortOrder(@Nullable final String input)
{
return MultiStageQueryContext.decodeSortOrder(input);