From 3df00aef9d27962b7f2262cd0055431273a5d067 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 15 Apr 2024 11:37:31 +0530 Subject: [PATCH] Add manifest file for MSQ export (#15953) Currently, export creates the files at the provided destination. The addition of the manifest file will provide a list of files created as part of the manifest. This will allow easier consumption of the data exported from Druid, especially for automated data pipelines --- docs/multi-stage-query/reference.md | 10 ++ .../output/GoogleExportStorageProvider.java | 9 ++ .../apache/druid/msq/exec/ControllerImpl.java | 32 ++++++- .../druid/msq/exec/ExportMetadataManager.java | 93 +++++++++++++++++++ .../druid/msq/indexing/MSQControllerTask.java | 6 ++ .../destination/ExportMSQDestination.java | 2 - .../results/ExportResultsFrameProcessor.java | 3 +- .../ExportResultsFrameProcessorFactory.java | 58 +++++++++++- .../apache/druid/msq/exec/MSQExportTest.java | 55 ++++++++--- .../apache/druid/msq/test/MSQTestBase.java | 9 -- .../s3/output/S3ExportStorageProvider.java | 9 ++ .../output/S3ExportStorageProviderTest.java | 9 ++ .../druid/storage/ExportStorageProvider.java | 2 + .../local/LocalFileExportStorageProvider.java | 8 ++ .../LocalFileExportStorageProviderTest.java | 13 +++ .../druid/sql/calcite/CalciteExportTest.java | 57 ++++++++---- .../sql/calcite/export/TestExportModule.java | 52 ----------- .../export/TestExportStorageConnector.java | 92 ------------------ .../TestExportStorageConnectorProvider.java | 46 --------- 19 files changed, 321 insertions(+), 244 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java delete mode 100644 sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportModule.java delete mode 100644 sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportStorageConnector.java delete mode 100644 sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportStorageConnectorProvider.java diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index f9c54b8e2e3..20d0ef2f7e6 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -99,6 +99,16 @@ For more information, see [Read external data with EXTERN](concepts.md#read-exte This variation of EXTERN requires one argument, the details of the destination as specified below. This variation additionally requires an `AS` clause to specify the format of the exported rows. +While exporting data, some metadata files will also be created at the destination in addition to the data. These files will be created in a directory `_symlink_format_manifest`. +- `_symlink_format_manifest/manifest`: Lists the files which were created as part of the export. The file is in the symlink manifest format, and consists of a list of absolute paths to the files created. +```text +s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker2-partition2.csv +s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker1-partition1.csv +s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition0.csv +... +s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition24.csv +``` + Keep the following in mind when using EXTERN to export rows: - Only INSERT statements are supported. - Only `CSV` format is supported as an export format. diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java index 480b80e118a..8d0c6b50b31 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import org.apache.druid.data.input.google.GoogleCloudStorageInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.error.DruidException; @@ -44,6 +45,8 @@ import java.util.List; public class GoogleExportStorageProvider implements ExportStorageProvider { public static final String TYPE_NAME = GoogleCloudStorageInputSource.TYPE_KEY; + private static final String DELIM = "/"; + private static final Joiner JOINER = Joiner.on(DELIM).skipNulls(); @JsonProperty private final String bucket; @JsonProperty @@ -146,4 +149,10 @@ public class GoogleExportStorageProvider implements ExportStorageProvider { return new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS).toString(); } + + @Override + public String getFilePathForManifest(String fileName) + { + return new CloudObjectLocation(bucket, JOINER.join(prefix, fileName)).toUri(GoogleStorageDruidModule.SCHEME_GS).toString(); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 918a4fd2969..878492d8a6e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -433,7 +433,7 @@ public class ControllerImpl implements Controller queryKernel = Preconditions.checkNotNull(queryRunResult.lhs); workerTaskRunnerFuture = Preconditions.checkNotNull(queryRunResult.rhs); resultsYielder = getFinalResultsYielder(queryDef, queryKernel); - publishSegmentsIfNeeded(queryDef, queryKernel); + handleQueryResults(queryDef, queryKernel); } catch (Throwable e) { exceptionEncountered = e; @@ -1746,12 +1746,16 @@ public class ControllerImpl implements Controller } } - private void publishSegmentsIfNeeded( + private void handleQueryResults( final QueryDefinition queryDef, final ControllerQueryKernel queryKernel ) throws IOException { - if (queryKernel.isSuccess() && MSQControllerTask.isIngestion(task.getQuerySpec())) { + if (!queryKernel.isSuccess()) { + return; + } + if (MSQControllerTask.isIngestion(task.getQuerySpec())) { + // Publish segments if needed. final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); //noinspection unchecked @@ -1790,6 +1794,25 @@ public class ControllerImpl implements Controller } log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size()); publishAllSegments(segments); + } else if (MSQControllerTask.isExport(task.getQuerySpec())) { + // Write manifest file. + ExportMSQDestination destination = (ExportMSQDestination) task.getQuerySpec().getDestination(); + ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider()); + + final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); + //noinspection unchecked + + + Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId); + if (!(resultObjectForStage instanceof List)) { + // This might occur if all workers are running on an older version. We are not able to write a manifest file in this case. + log.warn("Was unable to create manifest file due to "); + return; + } + @SuppressWarnings("unchecked") + List exportedFiles = (List) queryKernel.getResultObjectForStage(finalStageId); + log.info("Query [%s] exported %d files.", queryDef.getQueryId(), exportedFiles.size()); + exportMetadataManager.writeMetadata(exportedFiles); } } @@ -2018,7 +2041,7 @@ public class ControllerImpl implements Controller } else { return queryDef; } - } else if (querySpec.getDestination() instanceof ExportMSQDestination) { + } else if (MSQControllerTask.isExport(querySpec)) { final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination(); final ExportStorageProvider exportStorageProvider = exportMSQDestination.getExportStorageProvider(); @@ -2063,7 +2086,6 @@ public class ControllerImpl implements Controller } } - private static DataSchema generateDataSchema( MSQSpec querySpec, RowSignature querySignature, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java new file mode 100644 index 00000000000..3b9d0296de5 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.ExportStorageProvider; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * Manages writing of metadata files during export queries. + */ +public class ExportMetadataManager +{ + public static final String SYMLINK_DIR = "_symlink_format_manifest"; + public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest"; + public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta"; + public static final int MANIFEST_FILE_VERSION = 1; + private static final Logger log = new Logger(ExportMetadataManager.class); + private final ExportStorageProvider exportStorageProvider; + + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + { + this.exportStorageProvider = exportStorageProvider; + } + + public void writeMetadata(List exportedFiles) throws IOException + { + final StorageConnector storageConnector = exportStorageProvider.get(); + log.info("Writing manifest file at location [%s]", exportStorageProvider.getBasePath()); + + if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Found existing manifest file already present at path."); + } + + createManifestFile(storageConnector, exportedFiles); + createDruidMetadataFile(storageConnector); + } + + /** + * Creates a manifest file containing the list of files created by the export query. The manifest file consists of a + * new line separated list. Each line contains the absolute path to a file created by the export. + */ + public void createManifestFile(StorageConnector storageConnector, List exportedFiles) throws IOException + { + try (PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(storageConnector.write(MANIFEST_FILE), StandardCharsets.UTF_8))) { + for (String exportedFile : exportedFiles) { + printWriter.println(exportStorageProvider.getFilePathForManifest(exportedFile)); + } + } + } + + /** + * Creates a druid metadata file at the export location. This file contains extra information about the export, which + * cannot be stored in the manifest directly, so that it can follow the symlink format. + *
+ * Currently, this only contains the manifest file version. + */ + private void createDruidMetadataFile(StorageConnector storageConnector) throws IOException + { + // Write the export manifest metadata information. + // This includes only the version number currently. + try (PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(storageConnector.write(META_FILE), StandardCharsets.UTF_8))) { + printWriter.println(StringUtils.format("version: %s", MANIFEST_FILE_VERSION)); + } + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 7eb455ca842..64cecdfbf25 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -50,6 +50,7 @@ import org.apache.druid.msq.exec.ControllerImpl; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; +import org.apache.druid.msq.indexing.destination.ExportMSQDestination; import org.apache.druid.msq.indexing.destination.MSQDestination; import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.query.QueryContext; @@ -286,6 +287,11 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery return querySpec.getDestination() instanceof DataSourceMSQDestination; } + public static boolean isExport(final MSQSpec querySpec) + { + return querySpec.getDestination() instanceof ExportMSQDestination; + } + /** * Returns true if the task reads from the same table as the destionation. In this case, we would prefer to fail * instead of reading any unused segments to ensure that old data is not read. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java index 3187ace349b..14ac0ce4c2e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/ExportMSQDestination.java @@ -52,8 +52,6 @@ public class ExportMSQDestination implements MSQDestination this.resultFormat = resultFormat; } - - @JsonProperty("exportStorageProvider") public ExportStorageProvider getExportStorageProvider() { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java index 52697578b07..56b287781c2 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java @@ -33,7 +33,6 @@ import org.apache.druid.frame.processor.ReturnOrAwait; import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.segment.FrameStorageAdapter; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Unit; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.msq.counters.ChannelCounters; @@ -130,7 +129,7 @@ public class ExportResultsFrameProcessor implements FrameProcessor } if (inputChannel.isFinished()) { - return ReturnOrAwait.returnObject(Unit.instance()); + return ReturnOrAwait.returnObject(exportFilePath); } else { exportFrame(inputChannel.read()); return ReturnOrAwait.awaitAll(1); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java index 5fe9b52191c..beb626f0fce 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java @@ -23,11 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.druid.error.DruidException; import org.apache.druid.frame.processor.FrameProcessor; import org.apache.druid.frame.processor.OutputChannelFactory; import org.apache.druid.frame.processor.OutputChannels; import org.apache.druid.frame.processor.manager.ProcessorManagers; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -38,21 +40,24 @@ import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSliceReader; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.stage.StageInputSlice; +import org.apache.druid.msq.kernel.ExtraInfoHolder; import org.apache.druid.msq.kernel.FrameContext; +import org.apache.druid.msq.kernel.FrameProcessorFactory; +import org.apache.druid.msq.kernel.NilExtraInfoHolder; import org.apache.druid.msq.kernel.ProcessorsAndChannels; import org.apache.druid.msq.kernel.StageDefinition; -import org.apache.druid.msq.querykit.BaseFrameProcessorFactory; import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; @JsonTypeName("exportResults") -public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactory +public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory { private final String queryId; private final ExportStorageProvider exportStorageProvider; @@ -101,7 +106,7 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor } @Override - public ProcessorsAndChannels makeProcessors( + public ProcessorsAndChannels makeProcessors( StageDefinition stageDefinition, int workerNumber, List inputSlices, @@ -120,7 +125,11 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor ); if (inputSliceReader.numReadableInputs(slice) == 0) { - return new ProcessorsAndChannels<>(ProcessorManagers.none(), OutputChannels.none()); + return new ProcessorsAndChannels<>( + ProcessorManagers.of(Sequences.empty()) + .withAccumulation(new ArrayList(), (acc, file) -> acc), + OutputChannels.none() + ); } ChannelCounters channelCounter = counters.channel(CounterNames.outputChannel()); @@ -141,11 +150,50 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor ); return new ProcessorsAndChannels<>( - ProcessorManagers.of(processors), + ProcessorManagers.of(processors) + .withAccumulation(new ArrayList(), (acc, file) -> { + ((ArrayList) acc).add((String) file); + return acc; + }), OutputChannels.none() ); } + @Nullable + @Override + public TypeReference getResultTypeReference() + { + return new TypeReference() {}; + } + + @Override + public Object mergeAccumulatedResult(Object accumulated, Object otherAccumulated) + { + // If a worker does not return a list, fail the query + if (!(accumulated instanceof List)) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Expected a list result from worker, received [%s] instead. This might be due to workers having an older version.", accumulated.getClass()); + } + if (!(otherAccumulated instanceof List)) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build("Expected a list result from worker, received [%s] instead. This might be due to workers having an older version.", otherAccumulated.getClass()); + } + ((List) accumulated).addAll((List) otherAccumulated); + return accumulated; + } + + @Override + public ExtraInfoHolder makeExtraInfoHolder(@Nullable Object extra) + { + if (extra != null) { + throw new ISE("Expected null 'extra'"); + } + + return NilExtraInfoHolder.instance(); + } + private static String getExportFilePath(String queryId, int workerNumber, int partitionNumber, ResultFormat exportFormat) { return StringUtils.format("%s-worker%s-partition%s.%s", queryId, workerNumber, partitionNumber, exportFormat.toString()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java index 4481d046c92..edc98dcea98 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java @@ -62,7 +62,7 @@ public class MSQExportTest extends MSQTestBase .verifyResults(); Assert.assertEquals( - 1, + 2, // result file and manifest file Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length ); @@ -93,16 +93,19 @@ public class MSQExportTest extends MSQTestBase .verifyResults(); Assert.assertEquals( - 1, + 2, Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length ); + File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv"); List results = readResultsFromFile(resultFile); Assert.assertEquals( expectedFoo2FileContents(true), results ); + + verifyManifestFile(exportDir, ImmutableList.of(resultFile)); } @Test @@ -129,7 +132,7 @@ public class MSQExportTest extends MSQTestBase .verifyResults(); Assert.assertEquals( - expectedFooFileContents(false).size(), + expectedFooFileContents(false).size() + 1, // + 1 for the manifest file Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length ); } @@ -141,14 +144,13 @@ public class MSQExportTest extends MSQTestBase expectedResults.add("cnt,dim"); } expectedResults.addAll(ImmutableList.of( - "1,", - "1,10.1", - "1,2", - "1,1", - "1,def", - "1,abc" - ) - ); + "1,", + "1,10.1", + "1,2", + "1,1", + "1,def", + "1,abc" + )); return expectedResults; } @@ -173,4 +175,35 @@ public class MSQExportTest extends MSQTestBase return results; } } + + private void verifyManifestFile(File exportDir, List resultFiles) throws IOException + { + final File manifestFile = new File(exportDir, ExportMetadataManager.MANIFEST_FILE); + try ( + BufferedReader bufferedReader = new BufferedReader( + new InputStreamReader(Files.newInputStream(manifestFile.toPath()), StringUtils.UTF8_STRING) + ) + ) { + for (File file : resultFiles) { + Assert.assertEquals( + StringUtils.format("file:%s", file.getAbsolutePath()), + bufferedReader.readLine() + ); + } + Assert.assertNull(bufferedReader.readLine()); + } + + final File metaFile = new File(exportDir, ExportMetadataManager.META_FILE); + try ( + BufferedReader bufferedReader = new BufferedReader( + new InputStreamReader(Files.newInputStream(metaFile.toPath()), StringUtils.UTF8_STRING) + ) + ) { + Assert.assertEquals( + StringUtils.format("version: %s", ExportMetadataManager.MANIFEST_FILE_VERSION), + bufferedReader.readLine() + ); + Assert.assertNull(bufferedReader.readLine()); + } + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 57f052e6f36..a3b6fa3d458 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -159,8 +159,6 @@ import org.apache.druid.sql.SqlQueryPlus; import org.apache.druid.sql.SqlStatementFactory; import org.apache.druid.sql.SqlToolbox; import org.apache.druid.sql.calcite.BaseCalciteQueryTest; -import org.apache.druid.sql.calcite.export.TestExportStorageConnector; -import org.apache.druid.sql.calcite.export.TestExportStorageConnectorProvider; import org.apache.druid.sql.calcite.external.ExternalDataSource; import org.apache.druid.sql.calcite.external.ExternalOperatorConversion; import org.apache.druid.sql.calcite.external.HttpOperatorConversion; @@ -318,7 +316,6 @@ public class MSQTestBase extends BaseCalciteQueryTest protected SqlStatementFactory sqlStatementFactory; protected AuthorizerMapper authorizerMapper; private IndexIO indexIO; - protected TestExportStorageConnectorProvider exportStorageConnectorProvider = new TestExportStorageConnectorProvider(); // Contains the metadata of loaded segments protected List loadedSegmentsMetadata = new ArrayList<>(); // Mocks the return of data from data servers @@ -512,12 +509,6 @@ public class MSQTestBase extends BaseCalciteQueryTest .build(); objectMapper = setupObjectMapper(injector); - objectMapper.registerModule( - new SimpleModule(StorageConnector.class.getSimpleName()) - .registerSubtypes( - new NamedType(TestExportStorageConnectorProvider.class, TestExportStorageConnector.TYPE_NAME) - ) - ); objectMapper.registerModules(new StorageConnectorModule().getJacksonModules()); objectMapper.registerModules(sqlModule.getJacksonModules()); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java index 7577f56f76f..9b03a4f07c7 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.s3.S3InputSource; import org.apache.druid.error.DruidException; @@ -43,6 +44,8 @@ import java.util.List; public class S3ExportStorageProvider implements ExportStorageProvider { public static final String TYPE_NAME = S3InputSource.TYPE_KEY; + private static final String DELIM = "/"; + private static final Joiner JOINER = Joiner.on(DELIM).skipNulls(); @JsonProperty private final String bucket; @JsonProperty @@ -143,4 +146,10 @@ public class S3ExportStorageProvider implements ExportStorageProvider { return new CloudObjectLocation(bucket, prefix).toUri(S3StorageDruidModule.SCHEME).toString(); } + + @Override + public String getFilePathForManifest(String fileName) + { + return new CloudObjectLocation(bucket, JOINER.join(prefix, fileName)).toUri(S3StorageDruidModule.SCHEME).toString(); + } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java index 362f8583fd1..0a87c3a4c01 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java @@ -57,4 +57,13 @@ public class S3ExportStorageProviderTest () -> S3ExportStorageProvider.validateS3Prefix(validPrefixes, "bucket-name", "validPath123") ); } + + @Test + public void testExportManifestFilePath() + { + Assert.assertEquals( + "s3://export-bucket/export/table/file1", + new S3ExportStorageProvider("export-bucket", "export/table").getFilePathForManifest("file1") + ); + } } diff --git a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java index 890ac577b1a..173544e2f8d 100644 --- a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java @@ -31,4 +31,6 @@ public interface ExportStorageProvider extends Provider * Return a URI representation of the base path. This is used to be used for logging and error messages. */ String getBasePath(); + + String getFilePathForManifest(String fileName); } diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java index f0d4c87b41f..74b099aef88 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.storage.StorageConfig; import org.apache.druid.storage.StorageConnector; @@ -83,6 +84,13 @@ public class LocalFileExportStorageProvider implements ExportStorageProvider return exportPath; } + @Override + public String getFilePathForManifest(String fileName) + { + final File exportFile = new File(exportPath, fileName); + return StringUtils.format("file:%s", exportFile.toPath().normalize()); + } + @Override public boolean equals(Object o) { diff --git a/processing/src/test/java/org/apache/druid/storage/local/LocalFileExportStorageProviderTest.java b/processing/src/test/java/org/apache/druid/storage/local/LocalFileExportStorageProviderTest.java index 4daef2f9cd9..752720dcff7 100644 --- a/processing/src/test/java/org/apache/druid/storage/local/LocalFileExportStorageProviderTest.java +++ b/processing/src/test/java/org/apache/druid/storage/local/LocalFileExportStorageProviderTest.java @@ -90,4 +90,17 @@ public class LocalFileExportStorageProviderTest () -> LocalFileExportStorageProvider.validateAndGetPath("/base", "/base1") ); } + + @Test + public void testExportManifestFilePath() + { + Assert.assertEquals( + "file:/base/path1/file1", + new LocalFileExportStorageProvider("/base/path1").getFilePathForManifest("file1") + ); + Assert.assertEquals( + "file:/base/path1/file1", + new LocalFileExportStorageProvider("/base/../base/path1").getFilePathForManifest("file1") + ); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java index eaf6af9e77b..c78aa536d30 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite; import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; @@ -33,14 +34,13 @@ import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.server.security.ForbiddenException; -import org.apache.druid.sql.calcite.export.TestExportModule; -import org.apache.druid.sql.calcite.export.TestExportStorageConnector; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.destination.ExportDestination; import org.apache.druid.sql.http.SqlParameter; import org.apache.druid.storage.StorageConfig; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.local.LocalFileExportStorageProvider; import org.apache.druid.storage.local.LocalFileStorageConnectorProvider; import org.hamcrest.CoreMatchers; @@ -57,7 +57,24 @@ public class CalciteExportTest extends CalciteIngestionDmlTest public void configureGuice(DruidInjectorBuilder builder) { super.configureGuice(builder); - builder.addModule(new TestExportModule()); + builder.addModule( + new DruidModule() + { + @Override + public void configure(Binder binder) + { + } + + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule(StorageConnectorProvider.class.getSimpleName()).registerSubtypes( + new NamedType(LocalFileExportStorageProvider.class, CalciteTests.FORBIDDEN_DESTINATION) + ) + ); + } + }); builder.addModule(new DruidModule() { @Override @@ -84,10 +101,10 @@ public class CalciteExportTest extends CalciteIngestionDmlTest public void testReplaceIntoExtern() { testIngestionQuery() - .sql(StringUtils.format("REPLACE INTO EXTERN(%s(basePath => 'export')) " + .sql(StringUtils.format("REPLACE INTO EXTERN(%s(exportPath => 'export')) " + "AS CSV " + "OVERWRITE ALL " - + "SELECT dim2 FROM foo", TestExportStorageConnector.TYPE_NAME)) + + "SELECT dim2 FROM foo", LocalFileExportStorageProvider.TYPE_NAME)) .expectQuery( Druids.newScanQueryBuilder() .dataSource( @@ -99,7 +116,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest .legacy(false) .build() ) - .expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME)) + .expectResources(dataSourceRead("foo"), externalWrite(LocalFileExportStorageProvider.TYPE_NAME)) .expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build()) .verify(); } @@ -108,10 +125,10 @@ public class CalciteExportTest extends CalciteIngestionDmlTest public void testReplaceIntoExternShouldThrowUnsupportedException() { testIngestionQuery() - .sql(StringUtils.format("REPLACE INTO EXTERN(%s(basePath => 'export')) " + .sql(StringUtils.format("REPLACE INTO EXTERN(%s(exportPath => 'export')) " + "AS CSV " + "OVERWRITE ALL " - + "SELECT dim2 FROM foo", TestExportStorageConnector.TYPE_NAME)) + + "SELECT dim2 FROM foo", LocalFileExportStorageProvider.TYPE_NAME)) .expectValidationError( CoreMatchers.allOf( CoreMatchers.instanceOf(DruidException.class), @@ -145,10 +162,10 @@ public class CalciteExportTest extends CalciteIngestionDmlTest public void testExportWithPartitionedBy() { testIngestionQuery() - .sql(StringUtils.format("INSERT INTO EXTERN(%s()) " + .sql(StringUtils.format("INSERT INTO EXTERN(%s(exportPath=>'/tmp/export')) " + "AS CSV " + "SELECT dim2 FROM foo " - + "PARTITIONED BY ALL", TestExportStorageConnector.TYPE_NAME)) + + "PARTITIONED BY ALL", LocalFileStorageConnectorProvider.TYPE_NAME)) .expectValidationError( DruidException.class, "Export statements do not support a PARTITIONED BY or CLUSTERED BY clause." @@ -160,9 +177,9 @@ public class CalciteExportTest extends CalciteIngestionDmlTest public void testInsertIntoExtern() { testIngestionQuery() - .sql(StringUtils.format("INSERT INTO EXTERN(%s()) " + .sql(StringUtils.format("INSERT INTO EXTERN(%s(exportPath=>'/tmp/export')) " + "AS CSV " - + "SELECT dim2 FROM foo", TestExportStorageConnector.TYPE_NAME)) + + "SELECT dim2 FROM foo", LocalFileStorageConnectorProvider.TYPE_NAME)) .expectQuery( Druids.newScanQueryBuilder() .dataSource( @@ -174,7 +191,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest .legacy(false) .build() ) - .expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME)) + .expectResources(dataSourceRead("foo"), externalWrite(LocalFileStorageConnectorProvider.TYPE_NAME)) .expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build()) .verify(); } @@ -184,9 +201,9 @@ public class CalciteExportTest extends CalciteIngestionDmlTest public void testInsertIntoExternParameterized() { testIngestionQuery() - .sql(StringUtils.format("INSERT INTO EXTERN(%s()) " + .sql(StringUtils.format("INSERT INTO EXTERN(%s(exportPath=>'/tmp/export')) " + "AS CSV " - + "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME)) + + "SELECT dim2 FROM foo WHERE dim2=?", LocalFileStorageConnectorProvider.TYPE_NAME)) .parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val"))) .expectQuery( Druids.newScanQueryBuilder() @@ -200,7 +217,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest .legacy(false) .build() ) - .expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME)) + .expectResources(dataSourceRead("foo"), externalWrite(LocalFileStorageConnectorProvider.TYPE_NAME)) .expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build()) .verify(); } @@ -211,9 +228,9 @@ public class CalciteExportTest extends CalciteIngestionDmlTest public void testReplaceIntoExternParameterized() { testIngestionQuery() - .sql(StringUtils.format("REPLACE INTO EXTERN(%s()) " + .sql(StringUtils.format("REPLACE INTO EXTERN(%s(exportPath=>'/tmp/export')) " + "AS CSV " - + "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME)) + + "SELECT dim2 FROM foo WHERE dim2=?", LocalFileStorageConnectorProvider.TYPE_NAME)) .parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val"))) .expectQuery( Druids.newScanQueryBuilder() @@ -227,7 +244,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest .legacy(false) .build() ) - .expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME)) + .expectResources(dataSourceRead("foo"), externalWrite(LocalFileStorageConnectorProvider.TYPE_NAME)) .expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build()) .verify(); } @@ -263,7 +280,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest public void testWithForbiddenDestination() { testIngestionQuery() - .sql(StringUtils.format("insert into extern(%s()) as csv select __time, dim1 from foo", CalciteTests.FORBIDDEN_DESTINATION)) + .sql(StringUtils.format("insert into extern(%s(exportPath=>'/tmp/export')) as csv select __time, dim1 from foo", CalciteTests.FORBIDDEN_DESTINATION)) .expectValidationError(ForbiddenException.class) .verify(); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportModule.java b/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportModule.java deleted file mode 100644 index b6969f4c165..00000000000 --- a/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportModule.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.sql.calcite.export; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import org.apache.druid.initialization.DruidModule; -import org.apache.druid.sql.calcite.util.CalciteTests; -import org.apache.druid.storage.StorageConnectorProvider; - -import java.util.List; - -public class TestExportModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new SimpleModule(StorageConnectorProvider.class.getSimpleName()) - .registerSubtypes( - new NamedType(TestExportStorageConnectorProvider.class, TestExportStorageConnector.TYPE_NAME), - new NamedType(TestExportStorageConnectorProvider.class, CalciteTests.FORBIDDEN_DESTINATION) - ) - ); - } - - @Override - public void configure(Binder binder) - { - - } -} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportStorageConnector.java b/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportStorageConnector.java deleted file mode 100644 index b81b22ceb87..00000000000 --- a/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportStorageConnector.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.sql.calcite.export; - -import com.google.common.collect.ImmutableList; -import org.apache.druid.storage.StorageConnector; - -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Iterator; - -public class TestExportStorageConnector implements StorageConnector -{ - public static final String TYPE_NAME = "testStorage"; - private final ByteArrayOutputStream byteArrayOutputStream; - - public TestExportStorageConnector() - { - this.byteArrayOutputStream = new ByteArrayOutputStream(); - } - - public ByteArrayOutputStream getByteArrayOutputStream() - { - return byteArrayOutputStream; - } - - @Override - public boolean pathExists(String path) - { - return true; - } - - @Override - public InputStream read(String path) - { - throw new UnsupportedOperationException(); - } - - @Override - public InputStream readRange(String path, long from, long size) - { - throw new UnsupportedOperationException(); - } - - @Override - public OutputStream write(String path) - { - return byteArrayOutputStream; - } - - @Override - public void deleteFile(String path) - { - throw new UnsupportedOperationException(); - } - - @Override - public void deleteFiles(Iterable paths) - { - throw new UnsupportedOperationException(); - } - - @Override - public void deleteRecursively(String path) - { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator listDir(String dirName) - { - return ImmutableList.of().stream().iterator(); - } -} diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportStorageConnectorProvider.java b/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportStorageConnectorProvider.java deleted file mode 100644 index b1ca59e2ccc..00000000000 --- a/sql/src/test/java/org/apache/druid/sql/calcite/export/TestExportStorageConnectorProvider.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.sql.calcite.export; - -import org.apache.druid.storage.ExportStorageProvider; -import org.apache.druid.storage.StorageConnector; - -public class TestExportStorageConnectorProvider implements ExportStorageProvider -{ - private static final StorageConnector STORAGE_CONNECTOR = new TestExportStorageConnector(); - - @Override - public StorageConnector get() - { - return STORAGE_CONNECTOR; - } - - @Override - public String getResourceType() - { - return "testExport"; - } - - @Override - public String getBasePath() - { - return "testExport"; - } -}