From f82cc34e5b21028831ac3ea9ee71c39938c9b5c6 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 8 May 2024 11:34:20 +0530 Subject: [PATCH] Maintain a connection while exporting results with MSQ (#16381) * Maintain a connection while exporting results with MSQ * Fix checkstyle * Fix checkstyle * Move initialization from constructor * Add null check * Address review comments --- .../results/ExportResultsFrameProcessor.java | 104 ++++++++++-------- .../druid/storage/StorageConnector.java | 7 +- .../local/LocalFileStorageConnector.java | 2 +- 3 files changed, 66 insertions(+), 47 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java index 56b287781c2..e3635338231 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java @@ -61,10 +61,12 @@ public class ExportResultsFrameProcessor implements FrameProcessor private final StorageConnector storageConnector; private final ObjectMapper jsonMapper; private final ChannelCounters channelCounter; - final String exportFilePath; + private final String exportFilePath; private final Object2IntMap outputColumnNameToFrameColumnNumberMap; private final RowSignature exportRowSignature; + private volatile ResultFormat.Writer exportWriter; + public ExportResultsFrameProcessor( final ReadableFrameChannel inputChannel, final ResultFormat exportFormat, @@ -129,65 +131,77 @@ public class ExportResultsFrameProcessor implements FrameProcessor } if (inputChannel.isFinished()) { + exportWriter.writeResponseEnd(); return ReturnOrAwait.returnObject(exportFilePath); } else { + if (exportWriter == null) { + createExportWriter(); + } exportFrame(inputChannel.read()); return ReturnOrAwait.awaitAll(1); } } - private void exportFrame(final Frame frame) throws IOException + private void exportFrame(final Frame frame) { final Sequence cursorSequence = new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY) .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null); - // Add headers if we are writing to a new file. - final boolean writeHeader = !storageConnector.pathExists(exportFilePath); + SequenceUtils.forEach( + cursorSequence, + cursor -> { + try { + final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); - try (OutputStream stream = storageConnector.write(exportFilePath)) { - ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper); - formatter.writeResponseStart(); + //noinspection rawtypes + final List selectors = + frameReader.signature() + .getColumnNames() + .stream() + .map(columnSelectorFactory::makeColumnValueSelector) + .collect(Collectors.toList()); - if (writeHeader) { - formatter.writeHeaderFromRowSignature(exportRowSignature, false); - } - - SequenceUtils.forEach( - cursorSequence, - cursor -> { - try { - final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); - - //noinspection rawtypes - @SuppressWarnings("rawtypes") - final List selectors = - frameReader.signature() - .getColumnNames() - .stream() - .map(columnSelectorFactory::makeColumnValueSelector) - .collect(Collectors.toList()); - - while (!cursor.isDone()) { - formatter.writeRowStart(); - for (int j = 0; j < exportRowSignature.size(); j++) { - String columnName = exportRowSignature.getColumnName(j); - BaseObjectColumnValueSelector selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName)); - formatter.writeRowField(columnName, selector.getObject()); - } - channelCounter.incrementRowCount(); - formatter.writeRowEnd(); - cursor.advance(); + while (!cursor.isDone()) { + exportWriter.writeRowStart(); + for (int j = 0; j < exportRowSignature.size(); j++) { + String columnName = exportRowSignature.getColumnName(j); + BaseObjectColumnValueSelector selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName)); + exportWriter.writeRowField(columnName, selector.getObject()); } - } - catch (IOException e) { - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Exception occurred while writing file to the export location [%s].", exportFilePath); + channelCounter.incrementRowCount(); + exportWriter.writeRowEnd(); + cursor.advance(); } } - ); - formatter.writeResponseEnd(); + catch (IOException e) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception occurred while writing file to the export location [%s].", exportFilePath); + } + } + ); + } + + private void createExportWriter() throws IOException + { + OutputStream stream = null; + try { + stream = storageConnector.write(exportFilePath); + exportWriter = exportFormat.createFormatter(stream, jsonMapper); + exportWriter.writeResponseStart(); + exportWriter.writeHeaderFromRowSignature(exportRowSignature, false); + } + catch (IOException e) { + if (exportWriter != null) { + exportWriter.close(); + } + if (stream != null) { + stream.close(); + } + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception occurred while opening a stream to the export location [%s].", exportFilePath); } } @@ -195,5 +209,9 @@ public class ExportResultsFrameProcessor implements FrameProcessor public void cleanup() throws IOException { FrameProcessors.closeAll(inputChannels(), outputChannels()); + + if (exportWriter != null) { + exportWriter.close(); + } } } diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java index 3d1ead89b1e..d99d9469f08 100644 --- a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java +++ b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java @@ -90,12 +90,13 @@ public interface StorageConnector /** * Open an {@link OutputStream} for writing data to the path in the underlying storage system. - * Most implementations prepend the input path with a basePath. + * Most implementations prepend the input path with a basePath. If an object exists at the path, + * the existing object will be overwritten by the write operation. * Callers are adivised to namespace there files as there might be race conditions. * The caller should take care of closing the stream when done or in case of error. * - * @param path - * @return + * @param path to write + * @return OutputStream to the path * @throws IOException */ OutputStream write(String path) throws IOException; diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java index 225f3eb8537..3d96f8d43b1 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java @@ -101,7 +101,7 @@ public class LocalFileStorageConnector implements StorageConnector { File toWrite = fileWithBasePath(path); FileUtils.mkdirp(toWrite.getParentFile()); - return Files.newOutputStream(toWrite.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); + return Files.newOutputStream(toWrite.toPath()); } /**