Fix complex columns with export (#16572)

This PR fixes a few bugs with MSQ export. The main change is calling SqlResults#coerce before writing the column. This allows sketches and json to be correctly deserialized. The format of the exported complex columns are similar to those produced by Async MSQ queries with CSV format.

Notes:

    Fix printing of complex columns during export. Sketches and JSON are now correctly formatted during export.
    Fix an NPE if the writer has not been initialized. Empty export queries will create an empty file at the location.
    Fix a bug with counters for MSQ export, where rows were reported for only the first partition.
This commit is contained in:
Adarsh Sanjeev 2024-06-24 09:03:30 +05:30 committed by GitHub
parent 641f739a47
commit 1a883ba1f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 318 additions and 20 deletions

View File

@ -55,7 +55,12 @@ public class ChannelCounters implements QueryCounter
public void incrementRowCount()
{
add(NO_PARTITION, 1, 0, 0, 0);
incrementRowCount(NO_PARTITION);
}
public void incrementRowCount(int partition)
{
add(partition, 1, 0, 0, 0);
}
public void incrementBytes(long bytes)

View File

@ -582,7 +582,8 @@ public class ControllerImpl implements Controller
queryId(),
makeQueryControllerToolKit(),
querySpec,
context.jsonMapper()
context.jsonMapper(),
resultsContext
);
if (log.isDebugEnabled()) {
@ -1673,7 +1674,8 @@ public class ControllerImpl implements Controller
final String queryId,
@SuppressWarnings("rawtypes") final QueryKit toolKit,
final MSQSpec querySpec,
final ObjectMapper jsonMapper
final ObjectMapper jsonMapper,
final ResultsContext resultsContext
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
@ -1836,7 +1838,8 @@ public class ControllerImpl implements Controller
queryId,
exportStorageProvider,
resultFormat,
columnMappings
columnMappings,
resultsContext
))
);
return builder.build();

View File

@ -19,6 +19,8 @@
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.sql.calcite.run.SqlResults;
@ -34,21 +36,24 @@ public class ResultsContext
private final List<SqlTypeName> sqlTypeNames;
private final SqlResults.Context sqlResultsContext;
@JsonCreator
public ResultsContext(
final List<SqlTypeName> sqlTypeNames,
final SqlResults.Context sqlResultsContext
@JsonProperty("sqlTypeNames") final List<SqlTypeName> sqlTypeNames,
@JsonProperty("sqlResultsContext") final SqlResults.Context sqlResultsContext
)
{
this.sqlTypeNames = sqlTypeNames;
this.sqlResultsContext = sqlResultsContext;
}
@JsonProperty("sqlTypeNames")
@Nullable
public List<SqlTypeName> getSqlTypeNames()
{
return sqlTypeNames;
}
@JsonProperty("sqlResultsContext")
@Nullable
public SqlResults.Context getSqlResultsContext()
{

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.util.SequenceUtils;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
@ -44,6 +45,7 @@ import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.StorageConnector;
@ -64,6 +66,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
private final String exportFilePath;
private final Object2IntMap<String> outputColumnNameToFrameColumnNumberMap;
private final RowSignature exportRowSignature;
private final ResultsContext resultsContext;
private final int partitionNum;
private volatile ResultFormat.Writer exportWriter;
@ -75,7 +79,9 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
final ObjectMapper jsonMapper,
final ChannelCounters channelCounter,
final String exportFilePath,
final ColumnMappings columnMappings
final ColumnMappings columnMappings,
final ResultsContext resultsContext,
final int partitionNum
)
{
this.inputChannel = inputChannel;
@ -85,6 +91,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
this.jsonMapper = jsonMapper;
this.channelCounter = channelCounter;
this.exportFilePath = exportFilePath;
this.resultsContext = resultsContext;
this.partitionNum = partitionNum;
this.outputColumnNameToFrameColumnNumberMap = new Object2IntOpenHashMap<>();
final RowSignature inputRowSignature = frameReader.signature();
@ -130,13 +138,13 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
return ReturnOrAwait.awaitAll(1);
}
if (exportWriter == null) {
createExportWriter();
}
if (inputChannel.isFinished()) {
exportWriter.writeResponseEnd();
return ReturnOrAwait.returnObject(exportFilePath);
} else {
if (exportWriter == null) {
createExportWriter();
}
exportFrame(inputChannel.read());
return ReturnOrAwait.awaitAll(1);
}
@ -167,9 +175,23 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
for (int j = 0; j < exportRowSignature.size(); j++) {
String columnName = exportRowSignature.getColumnName(j);
BaseObjectColumnValueSelector<?> selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
exportWriter.writeRowField(columnName, selector.getObject());
if (resultsContext == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Received null resultsContext from the controller. This is due to a version mismatch between the controller and the worker. Please ensure that the worker and the controller are on the same version before retrying the query.");
}
exportWriter.writeRowField(
columnName,
SqlResults.coerce(
jsonMapper,
resultsContext.getSqlResultsContext(),
selector.getObject(),
resultsContext.getSqlTypeNames().get(j),
columnName
)
);
}
channelCounter.incrementRowCount();
channelCounter.incrementRowCount(partitionNum);
exportWriter.writeRowEnd();
cursor.advance();
}

View File

@ -36,6 +36,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.counters.CounterNames;
import org.apache.druid.msq.counters.CounterTracker;
import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.ReadableInput;
@ -63,19 +64,22 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory
private final ExportStorageProvider exportStorageProvider;
private final ResultFormat exportFormat;
private final ColumnMappings columnMappings;
private final ResultsContext resultsContext;
@JsonCreator
public ExportResultsFrameProcessorFactory(
@JsonProperty("queryId") String queryId,
@JsonProperty("exportStorageProvider") ExportStorageProvider exportStorageProvider,
@JsonProperty("exportFormat") ResultFormat exportFormat,
@JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings
@JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings,
@JsonProperty("resultsContext") @Nullable ResultsContext resultsContext
)
{
this.queryId = queryId;
this.exportStorageProvider = exportStorageProvider;
this.exportFormat = exportFormat;
this.columnMappings = columnMappings;
this.resultsContext = resultsContext;
}
@JsonProperty("queryId")
@ -105,6 +109,14 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory
return columnMappings;
}
@JsonProperty("resultsContext")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public ResultsContext getResultsContext()
{
return resultsContext;
}
@Override
public ProcessorsAndChannels<Object, Object> makeProcessors(
StageDefinition stageDefinition,
@ -132,7 +144,7 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory
);
}
ChannelCounters channelCounter = counters.channel(CounterNames.outputChannel());
final ChannelCounters channelCounter = counters.channel(CounterNames.outputChannel());
final Sequence<ReadableInput> readableInputs =
Sequences.simple(inputSliceReader.attach(0, slice, counters, warningPublisher));
@ -145,7 +157,9 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory
frameContext.jsonMapper(),
channelCounter,
getExportFilePath(queryId, workerNumber, readableInput.getStagePartition().getPartitionNumber(), exportFormat),
columnMappings
columnMappings,
resultsContext,
readableInput.getStagePartition().getPartitionNumber()
)
);

View File

@ -62,8 +62,8 @@ public class MSQExportTest extends MSQTestBase
.verifyResults();
Assert.assertEquals(
2, // result file and manifest file
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
2, // result file and manifest file
Objects.requireNonNull(exportDir.listFiles()).length
);
File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv");
@ -81,7 +81,7 @@ public class MSQExportTest extends MSQTestBase
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
File exportDir = newTempFolder("export");
final File exportDir = newTempFolder("export");
final String sql = StringUtils.format("insert into extern(local(exportPath=>'%s')) as csv select dim1 as table_dim, count(*) as table_count from foo where dim1 = 'abc' group by 1", exportDir.getAbsolutePath());
testIngestQuery().setSql(sql)
@ -94,7 +94,7 @@ public class MSQExportTest extends MSQTestBase
Assert.assertEquals(
2,
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
Objects.requireNonNull(exportDir.listFiles()).length
);
@ -133,7 +133,147 @@ public class MSQExportTest extends MSQTestBase
Assert.assertEquals(
expectedFooFileContents(false).size() + 1, // + 1 for the manifest file
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
Objects.requireNonNull(exportDir.listFiles()).length
);
}
@Test
void testExportComplexColumns() throws IOException
{
final RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("a", ColumnType.LONG)
.add("b", ColumnType.LONG)
.add("c_json", ColumnType.STRING).build();
final File exportDir = newTempFolder("export");
final String sql = StringUtils.format("INSERT INTO\n"
+ "EXTERN(local(exportPath=>'%s'))\n"
+ "AS CSV\n"
+ "SELECT\n"
+ " \"a\",\n"
+ " \"b\",\n"
+ " json_object(key 'c' value b) c_json\n"
+ "FROM (\n"
+ " SELECT *\n"
+ " FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"a,b\\n1,1\\n2,2\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
+ " )\n"
+ " ) EXTEND (\"a\" BIGINT, \"b\" BIGINT)\n"
+ ")", exportDir.getAbsolutePath());
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
Assert.assertEquals(
2, // result file and manifest file
Objects.requireNonNull(exportDir.listFiles()).length
);
File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv");
List<String> results = readResultsFromFile(resultFile);
Assert.assertEquals(
ImmutableList.of(
"a,b,c_json", "1,1,\"{\"\"c\"\":1}\"", "2,2,\"{\"\"c\"\":2}\""
),
results
);
}
@Test
void testExportSketchColumns() throws IOException
{
final RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("a", ColumnType.LONG)
.add("b", ColumnType.LONG)
.add("c_json", ColumnType.STRING).build();
final File exportDir = newTempFolder("export");
final String sql = StringUtils.format("INSERT INTO\n"
+ "EXTERN(local(exportPath=>'%s'))\n"
+ "AS CSV\n"
+ "SELECT\n"
+ " \"a\",\n"
+ " \"b\",\n"
+ " ds_hll(b) c_ds_hll\n"
+ "FROM (\n"
+ " SELECT *\n"
+ " FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{\"type\":\"inline\",\"data\":\"a,b\\n1,b1\\n2,b2\"}',\n"
+ " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
+ " )\n"
+ " ) EXTEND (\"a\" BIGINT, \"b\" VARCHAR)\n"
+ ")\n"
+ "GROUP BY 1,2", exportDir.getAbsolutePath());
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
Assert.assertEquals(
2, // result file and manifest file
Objects.requireNonNull(exportDir.listFiles()).length
);
File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv");
List<String> results = readResultsFromFile(resultFile);
Assert.assertEquals(
ImmutableList.of(
"a,b,c_ds_hll", "1,b1,\"\"\"AgEHDAMIAQBa1y0L\"\"\"", "2,b2,\"\"\"AgEHDAMIAQCi6V0G\"\"\""
),
results
);
}
@Test
void testEmptyExport() throws IOException
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
File exportDir = newTempFolder("export");
final String sql = StringUtils.format("INSERT INTO "
+ "EXTERN(local(exportPath=>'%s'))"
+ "AS CSV "
+ "SELECT cnt, dim1 AS dim "
+ "FROM foo "
+ "WHERE dim1='nonexistentvalue'", exportDir.getAbsolutePath());
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
Assert.assertEquals(
2, // result file and manifest file
Objects.requireNonNull(exportDir.listFiles()).length
);
File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv");
List<String> results = readResultsFromFile(resultFile);
Assert.assertEquals(
ImmutableList.of(
"cnt,dim"
),
results
);
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.planner.PlannerToolbox;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.apache.druid.sql.calcite.schema.DruidSchema;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.NamedDruidSchema;
import org.apache.druid.sql.calcite.schema.NamedViewSchema;
import org.apache.druid.sql.calcite.schema.ViewSchema;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
public class ResultsContextSerdeTest
{
private ResultsContext resultsContext;
private ObjectMapper objectMapper;
@Before
public void setUp()
{
final PlannerToolbox toolbox = new PlannerToolbox(
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
CalciteTests.getJsonMapper(),
new PlannerConfig(),
new DruidSchemaCatalog(
EasyMock.createMock(SchemaPlus.class),
ImmutableMap.of(
"druid", new NamedDruidSchema(EasyMock.createMock(DruidSchema.class), "druid"),
NamedViewSchema.NAME, new NamedViewSchema(EasyMock.createMock(ViewSchema.class))
)
),
CalciteTests.createJoinableFactoryWrapper(),
CatalogResolver.NULL_RESOLVER,
"druid",
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.TEST_AUTHORIZER_MAPPER,
AuthConfig.newBuilder().build()
);
final NativeSqlEngine engine = CalciteTests.createMockSqlEngine(
EasyMock.createMock(QuerySegmentWalker.class),
EasyMock.createMock(QueryRunnerFactoryConglomerate.class)
);
PlannerContext plannerContext = PlannerContext.create(
toolbox,
"DUMMY",
engine,
Collections.emptyMap(),
null
);
this.resultsContext = new ResultsContext(
ImmutableList.of(SqlTypeName.DOUBLE, SqlTypeName.TIMESTAMP, SqlTypeName.VARCHAR),
SqlResults.Context.fromPlannerContext(plannerContext)
);
this.objectMapper = new DefaultObjectMapper();
}
@Test
public void testSerde() throws JsonProcessingException
{
String s = objectMapper.writeValueAsString(resultsContext);
ResultsContext deserialized = objectMapper.readValue(s, ResultsContext.class);
Assert.assertEquals(resultsContext, deserialized);
}
}

View File

@ -48,5 +48,6 @@ public class ExportResultsFrameProcessorFactoryTest
ExportResultsFrameProcessorFactory.class
);
Assert.assertNull(exportResultsFrameProcessorFactory.getColumnMappings());
Assert.assertNull(exportResultsFrameProcessorFactory.getResultsContext());
}
}

View File

@ -530,6 +530,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
objectMapper = setupObjectMapper(injector);
objectMapper.registerModules(new StorageConnectorModule().getJacksonModules());
objectMapper.registerModules(sqlModule.getJacksonModules());
objectMapper.registerModules(NestedDataModule.getJacksonModulesList());
doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString());