Fix incorrect header names for certain export queries (#16096)

* Fix incorrect header names for certain queries

* Fix incorrect header names for certain queries

* Maintain upgrade compatibility

* Fix tests

* Change null handling
This commit is contained in:
Adarsh Sanjeev 2024-03-19 15:11:04 +05:30 committed by GitHub
parent f5c5573da8
commit a151bcfd12
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 184 additions and 51 deletions

View File

@ -1905,7 +1905,8 @@ public class ControllerImpl implements Controller
.processorFactory(new ExportResultsFrameProcessorFactory(
queryId,
exportStorageProvider,
resultFormat
resultFormat,
columnMappings
))
);
return builder.build();

View File

@ -21,6 +21,8 @@ package org.apache.druid.msq.querykit.results;
import com.fasterxml.jackson.databind.ObjectMapper;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
@ -35,13 +37,14 @@ import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.SequenceUtils;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.StorageConnector;
@ -60,6 +63,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
private final ObjectMapper jsonMapper;
private final ChannelCounters channelCounter;
final String exportFilePath;
private final Object2IntMap<String> outputColumnNameToFrameColumnNumberMap;
private final RowSignature exportRowSignature;
public ExportResultsFrameProcessor(
final ReadableFrameChannel inputChannel,
@ -68,7 +73,8 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
final StorageConnector storageConnector,
final ObjectMapper jsonMapper,
final ChannelCounters channelCounter,
final String exportFilePath
final String exportFilePath,
final ColumnMappings columnMappings
)
{
this.inputChannel = inputChannel;
@ -78,6 +84,30 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
this.jsonMapper = jsonMapper;
this.channelCounter = channelCounter;
this.exportFilePath = exportFilePath;
this.outputColumnNameToFrameColumnNumberMap = new Object2IntOpenHashMap<>();
final RowSignature inputRowSignature = frameReader.signature();
if (columnMappings == null) {
// If the column mappings wasn't sent, fail the query to avoid inconsistency in file format.
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Received null columnMappings from controller. This might be due to an upgrade.");
}
for (final ColumnMapping columnMapping : columnMappings.getMappings()) {
this.outputColumnNameToFrameColumnNumberMap.put(
columnMapping.getOutputColumn(),
frameReader.signature().indexOf(columnMapping.getQueryColumn())
);
}
final RowSignature.Builder exportRowSignatureBuilder = RowSignature.builder();
for (String outputColumn : columnMappings.getOutputColumnNames()) {
exportRowSignatureBuilder.add(
outputColumn,
inputRowSignature.getColumnType(outputColumnNameToFrameColumnNumberMap.getInt(outputColumn)).orElse(null)
);
}
this.exportRowSignature = exportRowSignatureBuilder.build();
}
@Override
@ -109,8 +139,6 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
private void exportFrame(final Frame frame) throws IOException
{
final RowSignature exportRowSignature = createRowSignatureForExport(frameReader.signature());
final Sequence<Cursor> cursorSequence =
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
@ -135,7 +163,7 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
//noinspection rawtypes
@SuppressWarnings("rawtypes")
final List<BaseObjectColumnValueSelector> selectors =
exportRowSignature
frameReader.signature()
.getColumnNames()
.stream()
.map(columnSelectorFactory::makeColumnValueSelector)
@ -144,7 +172,9 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
while (!cursor.isDone()) {
formatter.writeRowStart();
for (int j = 0; j < exportRowSignature.size(); j++) {
formatter.writeRowField(exportRowSignature.getColumnName(j), selectors.get(j).getObject());
String columnName = exportRowSignature.getColumnName(j);
BaseObjectColumnValueSelector<?> selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
formatter.writeRowField(columnName, selector.getObject());
}
channelCounter.incrementRowCount();
formatter.writeRowEnd();
@ -162,16 +192,6 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
}
}
private static RowSignature createRowSignatureForExport(RowSignature inputRowSignature)
{
RowSignature.Builder exportRowSignatureBuilder = RowSignature.builder();
inputRowSignature.getColumnNames()
.stream()
.filter(name -> !QueryKitUtils.PARTITION_BOOST_COLUMN.equals(name))
.forEach(name -> exportRowSignatureBuilder.add(name, inputRowSignature.getColumnType(name).orElse(null)));
return exportRowSignatureBuilder.build();
}
@Override
public void cleanup() throws IOException
{

View File

@ -20,6 +20,7 @@
package org.apache.druid.msq.querykit.results;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.error.DruidException;
@ -41,6 +42,7 @@ import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.kernel.ProcessorsAndChannels;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.utils.CollectionUtils;
@ -55,17 +57,20 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor
private final String queryId;
private final ExportStorageProvider exportStorageProvider;
private final ResultFormat exportFormat;
private final ColumnMappings columnMappings;
@JsonCreator
public ExportResultsFrameProcessorFactory(
@JsonProperty("queryId") String queryId,
@JsonProperty("exportStorageProvider") ExportStorageProvider exportStorageProvider,
@JsonProperty("exportFormat") ResultFormat exportFormat
@JsonProperty("exportFormat") ResultFormat exportFormat,
@JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings
)
{
this.queryId = queryId;
this.exportStorageProvider = exportStorageProvider;
this.exportFormat = exportFormat;
this.columnMappings = columnMappings;
}
@JsonProperty("queryId")
@ -87,6 +92,14 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor
return exportStorageProvider;
}
@JsonProperty("columnMappings")
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public ColumnMappings getColumnMappings()
{
return columnMappings;
}
@Override
public ProcessorsAndChannels<Object, Long> makeProcessors(
StageDefinition stageDefinition,
@ -122,7 +135,8 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor
exportStorageProvider.get(),
frameContext.jsonMapper(),
channelCounter,
getExportFilePath(queryId, workerNumber, readableInput.getStagePartition().getPartitionNumber(), exportFormat)
getExportFilePath(queryId, workerNumber, readableInput.getStagePartition().getPartitionNumber(), exportFormat),
columnMappings
)
);

View File

@ -26,15 +26,14 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.export.TestExportStorageConnector;
import org.apache.druid.sql.http.ResultFormat;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -46,14 +45,13 @@ public class MSQExportTest extends MSQTestBase
@Test
public void testExport() throws IOException
{
TestExportStorageConnector storageConnector = (TestExportStorageConnector) exportStorageConnectorProvider.get();
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
final String sql = StringUtils.format("insert into extern(%s()) as csv select cnt, dim1 from foo", TestExportStorageConnector.TYPE_NAME);
File exportDir = temporaryFolder.newFolder("export/");
final String sql = StringUtils.format("insert into extern(local(exportPath=>'%s')) as csv select cnt, dim1 as dim from foo", exportDir.getAbsolutePath());
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
@ -63,11 +61,47 @@ public class MSQExportTest extends MSQTestBase
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
List<Object[]> objects = expectedFooFileContents();
Assert.assertEquals(
1,
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
);
File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv");
List<String> results = readResultsFromFile(resultFile);
Assert.assertEquals(
expectedFooFileContents(true),
results
);
}
@Test
public void testExport2() throws IOException
{
RowSignature rowSignature = RowSignature.builder()
.add("dim1", ColumnType.STRING)
.add("cnt", ColumnType.LONG).build();
File exportDir = temporaryFolder.newFolder("export/");
final String sql = StringUtils.format("insert into extern(local(exportPath=>'%s')) as csv select dim1 as table_dim, count(*) as table_count from foo where dim1 = 'abc' group by 1", exportDir.getAbsolutePath());
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
.setQueryContext(DEFAULT_MSQ_CONTEXT)
.setExpectedRowSignature(rowSignature)
.setExpectedSegment(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
Assert.assertEquals(
convertResultsToString(objects),
new String(storageConnector.getByteArrayOutputStream().toByteArray(), Charset.defaultCharset())
1,
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
);
File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv");
List<String> results = readResultsFromFile(resultFile);
Assert.assertEquals(
expectedFoo2FileContents(true),
results
);
}
@ -95,36 +129,48 @@ public class MSQExportTest extends MSQTestBase
.verifyResults();
Assert.assertEquals(
expectedFooFileContents().size(),
expectedFooFileContents(false).size(),
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
);
}
private List<Object[]> expectedFooFileContents()
private List<String> expectedFooFileContents(boolean withHeader)
{
return new ArrayList<>(ImmutableList.of(
new Object[]{"1", null},
new Object[]{"1", 10.1},
new Object[]{"1", 2},
new Object[]{"1", 1},
new Object[]{"1", "def"},
new Object[]{"1", "abc"}
));
ArrayList<String> expectedResults = new ArrayList<>();
if (withHeader) {
expectedResults.add("cnt,dim");
}
expectedResults.addAll(ImmutableList.of(
"1,",
"1,10.1",
"1,2",
"1,1",
"1,def",
"1,abc"
)
);
return expectedResults;
}
private String convertResultsToString(List<Object[]> expectedRows) throws IOException
private List<String> expectedFoo2FileContents(boolean withHeader)
{
ByteArrayOutputStream expectedResult = new ByteArrayOutputStream();
ResultFormat.Writer formatter = ResultFormat.CSV.createFormatter(expectedResult, objectMapper);
formatter.writeResponseStart();
for (Object[] row : expectedRows) {
formatter.writeRowStart();
for (Object object : row) {
formatter.writeRowField("", object);
ArrayList<String> expectedResults = new ArrayList<>();
if (withHeader) {
expectedResults.add("table_dim,table_count");
}
formatter.writeRowEnd();
expectedResults.addAll(ImmutableList.of("abc,1"));
return expectedResults;
}
private List<String> readResultsFromFile(File resultFile) throws IOException
{
List<String> results = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new InputStreamReader(Files.newInputStream(resultFile.toPath()), StringUtils.UTF8_STRING))) {
String line;
while (!(line = br.readLine()).isEmpty()) {
results.add(line);
}
return results;
}
formatter.writeResponseEnd();
return new String(expectedResult.toByteArray(), Charset.defaultCharset());
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.querykit.results;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.storage.StorageConfig;
import org.apache.druid.storage.StorageConnectorModule;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class ExportResultsFrameProcessorFactoryTest
{
@Test
public void testSerde() throws IOException
{
String exportFactoryString = "{\"type\":\"exportResults\",\"queryId\":\"query-9128ieio9wq\",\"exportStorageProvider\":{\"type\":\"local\",\"exportPath\":\"/path\"},\"exportFormat\":\"csv\",\"resultTypeReference\":{\"type\":\"java.lang.Object\"}}";
ObjectMapper objectMapper = new DefaultObjectMapper();
objectMapper.registerModules(new StorageConnectorModule().getJacksonModules());
objectMapper.setInjectableValues(
new InjectableValues.Std()
.addValue(StorageConfig.class, new StorageConfig("/"))
);
ExportResultsFrameProcessorFactory exportResultsFrameProcessorFactory = objectMapper.readValue(
exportFactoryString,
ExportResultsFrameProcessorFactory.class
);
Assert.assertNull(exportResultsFrameProcessorFactory.getColumnMappings());
}
}