Maintain a connection while exporting results with MSQ (#16381)

* Maintain a connection while exporting results with MSQ

* Fix checkstyle

* Fix checkstyle

* Move initialization from constructor

* Add null check

* Address review comments
This commit is contained in:
Adarsh Sanjeev 2024-05-08 11:34:20 +05:30 committed by GitHub
parent 269e035e76
commit f82cc34e5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 66 additions and 47 deletions

View File

@ -61,10 +61,12 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
private final StorageConnector storageConnector;
private final ObjectMapper jsonMapper;
private final ChannelCounters channelCounter;
final String exportFilePath;
private final String exportFilePath;
private final Object2IntMap<String> outputColumnNameToFrameColumnNumberMap;
private final RowSignature exportRowSignature;
private volatile ResultFormat.Writer exportWriter;
public ExportResultsFrameProcessor(
final ReadableFrameChannel inputChannel,
final ResultFormat exportFormat,
@ -129,65 +131,77 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
}
if (inputChannel.isFinished()) {
exportWriter.writeResponseEnd();
return ReturnOrAwait.returnObject(exportFilePath);
} else {
if (exportWriter == null) {
createExportWriter();
}
exportFrame(inputChannel.read());
return ReturnOrAwait.awaitAll(1);
}
}
private void exportFrame(final Frame frame) throws IOException
private void exportFrame(final Frame frame)
{
final Sequence<Cursor> cursorSequence =
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
// Add headers if we are writing to a new file.
final boolean writeHeader = !storageConnector.pathExists(exportFilePath);
SequenceUtils.forEach(
cursorSequence,
cursor -> {
try {
final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
try (OutputStream stream = storageConnector.write(exportFilePath)) {
ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper);
formatter.writeResponseStart();
//noinspection rawtypes
final List<BaseObjectColumnValueSelector> selectors =
frameReader.signature()
.getColumnNames()
.stream()
.map(columnSelectorFactory::makeColumnValueSelector)
.collect(Collectors.toList());
if (writeHeader) {
formatter.writeHeaderFromRowSignature(exportRowSignature, false);
}
SequenceUtils.forEach(
cursorSequence,
cursor -> {
try {
final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
//noinspection rawtypes
@SuppressWarnings("rawtypes")
final List<BaseObjectColumnValueSelector> selectors =
frameReader.signature()
.getColumnNames()
.stream()
.map(columnSelectorFactory::makeColumnValueSelector)
.collect(Collectors.toList());
while (!cursor.isDone()) {
formatter.writeRowStart();
for (int j = 0; j < exportRowSignature.size(); j++) {
String columnName = exportRowSignature.getColumnName(j);
BaseObjectColumnValueSelector<?> selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
formatter.writeRowField(columnName, selector.getObject());
}
channelCounter.incrementRowCount();
formatter.writeRowEnd();
cursor.advance();
while (!cursor.isDone()) {
exportWriter.writeRowStart();
for (int j = 0; j < exportRowSignature.size(); j++) {
String columnName = exportRowSignature.getColumnName(j);
BaseObjectColumnValueSelector<?> selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
exportWriter.writeRowField(columnName, selector.getObject());
}
}
catch (IOException e) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception occurred while writing file to the export location [%s].", exportFilePath);
channelCounter.incrementRowCount();
exportWriter.writeRowEnd();
cursor.advance();
}
}
);
formatter.writeResponseEnd();
catch (IOException e) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception occurred while writing file to the export location [%s].", exportFilePath);
}
}
);
}
private void createExportWriter() throws IOException
{
OutputStream stream = null;
try {
stream = storageConnector.write(exportFilePath);
exportWriter = exportFormat.createFormatter(stream, jsonMapper);
exportWriter.writeResponseStart();
exportWriter.writeHeaderFromRowSignature(exportRowSignature, false);
}
catch (IOException e) {
if (exportWriter != null) {
exportWriter.close();
}
if (stream != null) {
stream.close();
}
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception occurred while opening a stream to the export location [%s].", exportFilePath);
}
}
@ -195,5 +209,9 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
public void cleanup() throws IOException
{
FrameProcessors.closeAll(inputChannels(), outputChannels());
if (exportWriter != null) {
exportWriter.close();
}
}
}

View File

@ -90,12 +90,13 @@ public interface StorageConnector
/**
* Open an {@link OutputStream} for writing data to the path in the underlying storage system.
* Most implementations prepend the input path with a basePath.
* Most implementations prepend the input path with a basePath. If an object exists at the path,
* the existing object will be overwritten by the write operation.
* Callers are adivised to namespace there files as there might be race conditions.
* The caller should take care of closing the stream when done or in case of error.
*
* @param path
* @return
* @param path to write
* @return OutputStream to the path
* @throws IOException
*/
OutputStream write(String path) throws IOException;

View File

@ -101,7 +101,7 @@ public class LocalFileStorageConnector implements StorageConnector
{
File toWrite = fileWithBasePath(path);
FileUtils.mkdirp(toWrite.getParentFile());
return Files.newOutputStream(toWrite.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
return Files.newOutputStream(toWrite.toPath());
}
/**