From 1a883ba1f7b6f6ccc8f166658347ea88c5c61639 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 24 Jun 2024 09:03:30 +0530 Subject: [PATCH] Fix complex columns with export (#16572) This PR fixes a few bugs with MSQ export. The main change is calling SqlResults#coerce before writing the column. This allows sketches and json to be correctly deserialized. The format of the exported complex columns are similar to those produced by Async MSQ queries with CSV format. Notes: Fix printing of complex columns during export. Sketches and JSON are now correctly formatted during export. Fix an NPE if the writer has not been initialized. Empty export queries will create an empty file at the location. Fix a bug with counters for MSQ export, where rows were reported for only the first partition. --- .../druid/msq/counters/ChannelCounters.java | 7 +- .../apache/druid/msq/exec/ControllerImpl.java | 9 +- .../apache/druid/msq/exec/ResultsContext.java | 9 +- .../results/ExportResultsFrameProcessor.java | 34 +++- .../ExportResultsFrameProcessorFactory.java | 20 ++- .../apache/druid/msq/exec/MSQExportTest.java | 150 +++++++++++++++++- .../msq/exec/ResultsContextSerdeTest.java | 107 +++++++++++++ ...xportResultsFrameProcessorFactoryTest.java | 1 + .../apache/druid/msq/test/MSQTestBase.java | 1 + 9 files changed, 318 insertions(+), 20 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ResultsContextSerdeTest.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java index 3578fa21a6e..1cf017635a6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java @@ -55,7 +55,12 @@ public class ChannelCounters implements QueryCounter public void incrementRowCount() { - add(NO_PARTITION, 1, 0, 0, 0); + incrementRowCount(NO_PARTITION); + } + + public void incrementRowCount(int partition) + { + add(partition, 1, 0, 0, 0); } public void incrementBytes(long bytes) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 72728c60564..b6541c7f26a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -582,7 +582,8 @@ public class ControllerImpl implements Controller queryId(), makeQueryControllerToolKit(), querySpec, - context.jsonMapper() + context.jsonMapper(), + resultsContext ); if (log.isDebugEnabled()) { @@ -1673,7 +1674,8 @@ public class ControllerImpl implements Controller final String queryId, @SuppressWarnings("rawtypes") final QueryKit toolKit, final MSQSpec querySpec, - final ObjectMapper jsonMapper + final ObjectMapper jsonMapper, + final ResultsContext resultsContext ) { final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); @@ -1836,7 +1838,8 @@ public class ControllerImpl implements Controller queryId, exportStorageProvider, resultFormat, - columnMappings + columnMappings, + resultsContext )) ); return builder.build(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ResultsContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ResultsContext.java index 9e565bb75a5..209113f080a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ResultsContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ResultsContext.java @@ -19,6 +19,8 @@ package org.apache.druid.msq.exec; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.sql.calcite.run.SqlResults; @@ -34,21 +36,24 @@ public class ResultsContext private final List sqlTypeNames; private final SqlResults.Context sqlResultsContext; + @JsonCreator public ResultsContext( - final List sqlTypeNames, - final SqlResults.Context sqlResultsContext + @JsonProperty("sqlTypeNames") final List sqlTypeNames, + @JsonProperty("sqlResultsContext") final SqlResults.Context sqlResultsContext ) { this.sqlTypeNames = sqlTypeNames; this.sqlResultsContext = sqlResultsContext; } + @JsonProperty("sqlTypeNames") @Nullable public List getSqlTypeNames() { return sqlTypeNames; } + @JsonProperty("sqlResultsContext") @Nullable public SqlResults.Context getSqlResultsContext() { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java index e3635338231..ccedf0402c6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.exec.ResultsContext; import org.apache.druid.msq.util.SequenceUtils; import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; @@ -44,6 +45,7 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.sql.calcite.planner.ColumnMapping; import org.apache.druid.sql.calcite.planner.ColumnMappings; +import org.apache.druid.sql.calcite.run.SqlResults; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.storage.StorageConnector; @@ -64,6 +66,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor private final String exportFilePath; private final Object2IntMap outputColumnNameToFrameColumnNumberMap; private final RowSignature exportRowSignature; + private final ResultsContext resultsContext; + private final int partitionNum; private volatile ResultFormat.Writer exportWriter; @@ -75,7 +79,9 @@ public class ExportResultsFrameProcessor implements FrameProcessor final ObjectMapper jsonMapper, final ChannelCounters channelCounter, final String exportFilePath, - final ColumnMappings columnMappings + final ColumnMappings columnMappings, + final ResultsContext resultsContext, + final int partitionNum ) { this.inputChannel = inputChannel; @@ -85,6 +91,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor this.jsonMapper = jsonMapper; this.channelCounter = channelCounter; this.exportFilePath = exportFilePath; + this.resultsContext = resultsContext; + this.partitionNum = partitionNum; this.outputColumnNameToFrameColumnNumberMap = new Object2IntOpenHashMap<>(); final RowSignature inputRowSignature = frameReader.signature(); @@ -130,13 +138,13 @@ public class ExportResultsFrameProcessor implements FrameProcessor return ReturnOrAwait.awaitAll(1); } + if (exportWriter == null) { + createExportWriter(); + } if (inputChannel.isFinished()) { exportWriter.writeResponseEnd(); return ReturnOrAwait.returnObject(exportFilePath); } else { - if (exportWriter == null) { - createExportWriter(); - } exportFrame(inputChannel.read()); return ReturnOrAwait.awaitAll(1); } @@ -167,9 +175,23 @@ public class ExportResultsFrameProcessor implements FrameProcessor for (int j = 0; j < exportRowSignature.size(); j++) { String columnName = exportRowSignature.getColumnName(j); BaseObjectColumnValueSelector selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName)); - exportWriter.writeRowField(columnName, selector.getObject()); + if (resultsContext == null) { + throw DruidException.forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Received null resultsContext from the controller. This is due to a version mismatch between the controller and the worker. Please ensure that the worker and the controller are on the same version before retrying the query."); + } + exportWriter.writeRowField( + columnName, + SqlResults.coerce( + jsonMapper, + resultsContext.getSqlResultsContext(), + selector.getObject(), + resultsContext.getSqlTypeNames().get(j), + columnName + ) + ); } - channelCounter.incrementRowCount(); + channelCounter.incrementRowCount(partitionNum); exportWriter.writeRowEnd(); cursor.advance(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java index beb626f0fce..af4038cd247 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java @@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.counters.CounterNames; import org.apache.druid.msq.counters.CounterTracker; +import org.apache.druid.msq.exec.ResultsContext; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSliceReader; import org.apache.druid.msq.input.ReadableInput; @@ -63,19 +64,22 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory private final ExportStorageProvider exportStorageProvider; private final ResultFormat exportFormat; private final ColumnMappings columnMappings; + private final ResultsContext resultsContext; @JsonCreator public ExportResultsFrameProcessorFactory( @JsonProperty("queryId") String queryId, @JsonProperty("exportStorageProvider") ExportStorageProvider exportStorageProvider, @JsonProperty("exportFormat") ResultFormat exportFormat, - @JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings + @JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings, + @JsonProperty("resultsContext") @Nullable ResultsContext resultsContext ) { this.queryId = queryId; this.exportStorageProvider = exportStorageProvider; this.exportFormat = exportFormat; this.columnMappings = columnMappings; + this.resultsContext = resultsContext; } @JsonProperty("queryId") @@ -105,6 +109,14 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory return columnMappings; } + @JsonProperty("resultsContext") + @JsonInclude(JsonInclude.Include.NON_NULL) + @Nullable + public ResultsContext getResultsContext() + { + return resultsContext; + } + @Override public ProcessorsAndChannels makeProcessors( StageDefinition stageDefinition, @@ -132,7 +144,7 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory ); } - ChannelCounters channelCounter = counters.channel(CounterNames.outputChannel()); + final ChannelCounters channelCounter = counters.channel(CounterNames.outputChannel()); final Sequence readableInputs = Sequences.simple(inputSliceReader.attach(0, slice, counters, warningPublisher)); @@ -145,7 +157,9 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory frameContext.jsonMapper(), channelCounter, getExportFilePath(queryId, workerNumber, readableInput.getStagePartition().getPartitionNumber(), exportFormat), - columnMappings + columnMappings, + resultsContext, + readableInput.getStagePartition().getPartitionNumber() ) ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java index edc98dcea98..71b816e78c5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java @@ -62,8 +62,8 @@ public class MSQExportTest extends MSQTestBase .verifyResults(); Assert.assertEquals( - 2, // result file and manifest file - Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length + 2, // result file and manifest file + Objects.requireNonNull(exportDir.listFiles()).length ); File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv"); @@ -81,7 +81,7 @@ public class MSQExportTest extends MSQTestBase .add("dim1", ColumnType.STRING) .add("cnt", ColumnType.LONG).build(); - File exportDir = newTempFolder("export"); + final File exportDir = newTempFolder("export"); final String sql = StringUtils.format("insert into extern(local(exportPath=>'%s')) as csv select dim1 as table_dim, count(*) as table_count from foo where dim1 = 'abc' group by 1", exportDir.getAbsolutePath()); testIngestQuery().setSql(sql) @@ -94,7 +94,7 @@ public class MSQExportTest extends MSQTestBase Assert.assertEquals( 2, - Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length + Objects.requireNonNull(exportDir.listFiles()).length ); @@ -133,7 +133,147 @@ public class MSQExportTest extends MSQTestBase Assert.assertEquals( expectedFooFileContents(false).size() + 1, // + 1 for the manifest file - Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length + Objects.requireNonNull(exportDir.listFiles()).length + ); + } + + @Test + void testExportComplexColumns() throws IOException + { + final RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("a", ColumnType.LONG) + .add("b", ColumnType.LONG) + .add("c_json", ColumnType.STRING).build(); + + final File exportDir = newTempFolder("export"); + final String sql = StringUtils.format("INSERT INTO\n" + + "EXTERN(local(exportPath=>'%s'))\n" + + "AS CSV\n" + + "SELECT\n" + + " \"a\",\n" + + " \"b\",\n" + + " json_object(key 'c' value b) c_json\n" + + "FROM (\n" + + " SELECT *\n" + + " FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"inline\",\"data\":\"a,b\\n1,1\\n2,2\"}',\n" + + " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n" + + " )\n" + + " ) EXTEND (\"a\" BIGINT, \"b\" BIGINT)\n" + + ")", exportDir.getAbsolutePath()); + + testIngestQuery().setSql(sql) + .setExpectedDataSource("foo1") + .setQueryContext(DEFAULT_MSQ_CONTEXT) + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of()) + .setExpectedResultRows(ImmutableList.of()) + .verifyResults(); + + Assert.assertEquals( + 2, // result file and manifest file + Objects.requireNonNull(exportDir.listFiles()).length + ); + + File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv"); + List results = readResultsFromFile(resultFile); + Assert.assertEquals( + ImmutableList.of( + "a,b,c_json", "1,1,\"{\"\"c\"\":1}\"", "2,2,\"{\"\"c\"\":2}\"" + ), + results + ); + } + + @Test + void testExportSketchColumns() throws IOException + { + final RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("a", ColumnType.LONG) + .add("b", ColumnType.LONG) + .add("c_json", ColumnType.STRING).build(); + + final File exportDir = newTempFolder("export"); + final String sql = StringUtils.format("INSERT INTO\n" + + "EXTERN(local(exportPath=>'%s'))\n" + + "AS CSV\n" + + "SELECT\n" + + " \"a\",\n" + + " \"b\",\n" + + " ds_hll(b) c_ds_hll\n" + + "FROM (\n" + + " SELECT *\n" + + " FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"inline\",\"data\":\"a,b\\n1,b1\\n2,b2\"}',\n" + + " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n" + + " )\n" + + " ) EXTEND (\"a\" BIGINT, \"b\" VARCHAR)\n" + + ")\n" + + "GROUP BY 1,2", exportDir.getAbsolutePath()); + + testIngestQuery().setSql(sql) + .setExpectedDataSource("foo1") + .setQueryContext(DEFAULT_MSQ_CONTEXT) + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of()) + .setExpectedResultRows(ImmutableList.of()) + .verifyResults(); + + Assert.assertEquals( + 2, // result file and manifest file + Objects.requireNonNull(exportDir.listFiles()).length + ); + + File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv"); + List results = readResultsFromFile(resultFile); + Assert.assertEquals( + ImmutableList.of( + "a,b,c_ds_hll", "1,b1,\"\"\"AgEHDAMIAQBa1y0L\"\"\"", "2,b2,\"\"\"AgEHDAMIAQCi6V0G\"\"\"" + ), + results + ); + } + + @Test + void testEmptyExport() throws IOException + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + + File exportDir = newTempFolder("export"); + final String sql = StringUtils.format("INSERT INTO " + + "EXTERN(local(exportPath=>'%s'))" + + "AS CSV " + + "SELECT cnt, dim1 AS dim " + + "FROM foo " + + "WHERE dim1='nonexistentvalue'", exportDir.getAbsolutePath()); + + testIngestQuery().setSql(sql) + .setExpectedDataSource("foo1") + .setQueryContext(DEFAULT_MSQ_CONTEXT) + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(ImmutableSet.of()) + .setExpectedResultRows(ImmutableList.of()) + .verifyResults(); + + Assert.assertEquals( + 2, // result file and manifest file + Objects.requireNonNull(exportDir.listFiles()).length + ); + + File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv"); + List results = readResultsFromFile(resultFile); + Assert.assertEquals( + ImmutableList.of( + "cnt,dim" + ), + results ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ResultsContextSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ResultsContextSerdeTest.java new file mode 100644 index 00000000000..b6023392576 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ResultsContextSerdeTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.sql.calcite.planner.CalciteRulesManager; +import org.apache.druid.sql.calcite.planner.CatalogResolver; +import org.apache.druid.sql.calcite.planner.PlannerConfig; +import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.planner.PlannerToolbox; +import org.apache.druid.sql.calcite.run.NativeSqlEngine; +import org.apache.druid.sql.calcite.run.SqlResults; +import org.apache.druid.sql.calcite.schema.DruidSchema; +import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog; +import org.apache.druid.sql.calcite.schema.NamedDruidSchema; +import org.apache.druid.sql.calcite.schema.NamedViewSchema; +import org.apache.druid.sql.calcite.schema.ViewSchema; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; + +public class ResultsContextSerdeTest +{ + private ResultsContext resultsContext; + private ObjectMapper objectMapper; + + @Before + public void setUp() + { + final PlannerToolbox toolbox = new PlannerToolbox( + CalciteTests.createOperatorTable(), + CalciteTests.createExprMacroTable(), + CalciteTests.getJsonMapper(), + new PlannerConfig(), + new DruidSchemaCatalog( + EasyMock.createMock(SchemaPlus.class), + ImmutableMap.of( + "druid", new NamedDruidSchema(EasyMock.createMock(DruidSchema.class), "druid"), + NamedViewSchema.NAME, new NamedViewSchema(EasyMock.createMock(ViewSchema.class)) + ) + ), + CalciteTests.createJoinableFactoryWrapper(), + CatalogResolver.NULL_RESOLVER, + "druid", + new CalciteRulesManager(ImmutableSet.of()), + CalciteTests.TEST_AUTHORIZER_MAPPER, + AuthConfig.newBuilder().build() + ); + final NativeSqlEngine engine = CalciteTests.createMockSqlEngine( + EasyMock.createMock(QuerySegmentWalker.class), + EasyMock.createMock(QueryRunnerFactoryConglomerate.class) + ); + + PlannerContext plannerContext = PlannerContext.create( + toolbox, + "DUMMY", + engine, + Collections.emptyMap(), + null + ); + this.resultsContext = new ResultsContext( + ImmutableList.of(SqlTypeName.DOUBLE, SqlTypeName.TIMESTAMP, SqlTypeName.VARCHAR), + SqlResults.Context.fromPlannerContext(plannerContext) + ); + this.objectMapper = new DefaultObjectMapper(); + } + + @Test + public void testSerde() throws JsonProcessingException + { + String s = objectMapper.writeValueAsString(resultsContext); + + ResultsContext deserialized = objectMapper.readValue(s, ResultsContext.class); + Assert.assertEquals(resultsContext, deserialized); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java index 90f19164770..6fed86035d6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java @@ -48,5 +48,6 @@ public class ExportResultsFrameProcessorFactoryTest ExportResultsFrameProcessorFactory.class ); Assert.assertNull(exportResultsFrameProcessorFactory.getColumnMappings()); + Assert.assertNull(exportResultsFrameProcessorFactory.getResultsContext()); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 8ce22876647..33c1374d2a7 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -530,6 +530,7 @@ public class MSQTestBase extends BaseCalciteQueryTest objectMapper = setupObjectMapper(injector); objectMapper.registerModules(new StorageConnectorModule().getJacksonModules()); objectMapper.registerModules(sqlModule.getJacksonModules()); + objectMapper.registerModules(NestedDataModule.getJacksonModulesList()); doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString());