Add manifest file for MSQ export (#15953)

Currently, export creates the files at the provided destination. The addition of the manifest file will provide a list of files created as part of the manifest. This will allow easier consumption of the data exported from Druid, especially for automated data pipelines
This commit is contained in:
Adarsh Sanjeev 2024-04-15 11:37:31 +05:30 committed by GitHub
parent 81d7b6ebe1
commit 3df00aef9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 321 additions and 244 deletions

View File

@ -99,6 +99,16 @@ For more information, see [Read external data with EXTERN](concepts.md#read-exte
This variation of EXTERN requires one argument, the details of the destination as specified below.
This variation additionally requires an `AS` clause to specify the format of the exported rows.
While exporting data, some metadata files will also be created at the destination in addition to the data. These files will be created in a directory `_symlink_format_manifest`.
- `_symlink_format_manifest/manifest`: Lists the files which were created as part of the export. The file is in the symlink manifest format, and consists of a list of absolute paths to the files created.
```text
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker2-partition2.csv
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker1-partition1.csv
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition0.csv
...
s3://export-bucket/export/query-6564a32f-2194-423a-912e-eead470a37c4-worker0-partition24.csv
```
Keep the following in mind when using EXTERN to export rows:
- Only INSERT statements are supported.
- Only `CSV` format is supported as an export format.

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.error.DruidException;
@ -44,6 +45,8 @@ import java.util.List;
public class GoogleExportStorageProvider implements ExportStorageProvider
{
public static final String TYPE_NAME = GoogleCloudStorageInputSource.TYPE_KEY;
private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
@JsonProperty
private final String bucket;
@JsonProperty
@ -146,4 +149,10 @@ public class GoogleExportStorageProvider implements ExportStorageProvider
{
return new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS).toString();
}
@Override
public String getFilePathForManifest(String fileName)
{
return new CloudObjectLocation(bucket, JOINER.join(prefix, fileName)).toUri(GoogleStorageDruidModule.SCHEME_GS).toString();
}
}

View File

@ -433,7 +433,7 @@ public class ControllerImpl implements Controller
queryKernel = Preconditions.checkNotNull(queryRunResult.lhs);
workerTaskRunnerFuture = Preconditions.checkNotNull(queryRunResult.rhs);
resultsYielder = getFinalResultsYielder(queryDef, queryKernel);
publishSegmentsIfNeeded(queryDef, queryKernel);
handleQueryResults(queryDef, queryKernel);
}
catch (Throwable e) {
exceptionEncountered = e;
@ -1746,12 +1746,16 @@ public class ControllerImpl implements Controller
}
}
private void publishSegmentsIfNeeded(
private void handleQueryResults(
final QueryDefinition queryDef,
final ControllerQueryKernel queryKernel
) throws IOException
{
if (queryKernel.isSuccess() && MSQControllerTask.isIngestion(task.getQuerySpec())) {
if (!queryKernel.isSuccess()) {
return;
}
if (MSQControllerTask.isIngestion(task.getQuerySpec())) {
// Publish segments if needed.
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
//noinspection unchecked
@ -1790,6 +1794,25 @@ public class ControllerImpl implements Controller
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
} else if (MSQControllerTask.isExport(task.getQuerySpec())) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) task.getQuerySpec().getDestination();
ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider());
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
//noinspection unchecked
Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId);
if (!(resultObjectForStage instanceof List)) {
// This might occur if all workers are running on an older version. We are not able to write a manifest file in this case.
log.warn("Was unable to create manifest file due to ");
return;
}
@SuppressWarnings("unchecked")
List<String> exportedFiles = (List<String>) queryKernel.getResultObjectForStage(finalStageId);
log.info("Query [%s] exported %d files.", queryDef.getQueryId(), exportedFiles.size());
exportMetadataManager.writeMetadata(exportedFiles);
}
}
@ -2018,7 +2041,7 @@ public class ControllerImpl implements Controller
} else {
return queryDef;
}
} else if (querySpec.getDestination() instanceof ExportMSQDestination) {
} else if (MSQControllerTask.isExport(querySpec)) {
final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
final ExportStorageProvider exportStorageProvider = exportMSQDestination.getExportStorageProvider();
@ -2063,7 +2086,6 @@ public class ControllerImpl implements Controller
}
}
private static DataSchema generateDataSchema(
MSQSpec querySpec,
RowSignature querySignature,

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.storage.StorageConnector;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* Manages writing of metadata files during export queries.
*/
public class ExportMetadataManager
{
public static final String SYMLINK_DIR = "_symlink_format_manifest";
public static final String MANIFEST_FILE = SYMLINK_DIR + "/manifest";
public static final String META_FILE = SYMLINK_DIR + "/druid_export_meta";
public static final int MANIFEST_FILE_VERSION = 1;
private static final Logger log = new Logger(ExportMetadataManager.class);
private final ExportStorageProvider exportStorageProvider;
public ExportMetadataManager(final ExportStorageProvider exportStorageProvider)
{
this.exportStorageProvider = exportStorageProvider;
}
public void writeMetadata(List<String> exportedFiles) throws IOException
{
final StorageConnector storageConnector = exportStorageProvider.get();
log.info("Writing manifest file at location [%s]", exportStorageProvider.getBasePath());
if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Found existing manifest file already present at path.");
}
createManifestFile(storageConnector, exportedFiles);
createDruidMetadataFile(storageConnector);
}
/**
* Creates a manifest file containing the list of files created by the export query. The manifest file consists of a
* new line separated list. Each line contains the absolute path to a file created by the export.
*/
public void createManifestFile(StorageConnector storageConnector, List<String> exportedFiles) throws IOException
{
try (PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(storageConnector.write(MANIFEST_FILE), StandardCharsets.UTF_8))) {
for (String exportedFile : exportedFiles) {
printWriter.println(exportStorageProvider.getFilePathForManifest(exportedFile));
}
}
}
/**
* Creates a druid metadata file at the export location. This file contains extra information about the export, which
* cannot be stored in the manifest directly, so that it can follow the symlink format.
* <br>
* Currently, this only contains the manifest file version.
*/
private void createDruidMetadataFile(StorageConnector storageConnector) throws IOException
{
// Write the export manifest metadata information.
// This includes only the version number currently.
try (PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(storageConnector.write(META_FILE), StandardCharsets.UTF_8))) {
printWriter.println(StringUtils.format("version: %s", MANIFEST_FILE_VERSION));
}
}
}

View File

@ -50,6 +50,7 @@ import org.apache.druid.msq.exec.ControllerImpl;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.ExportMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
@ -286,6 +287,11 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery
return querySpec.getDestination() instanceof DataSourceMSQDestination;
}
public static boolean isExport(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof ExportMSQDestination;
}
/**
* Returns true if the task reads from the same table as the destionation. In this case, we would prefer to fail
* instead of reading any unused segments to ensure that old data is not read.

View File

@ -52,8 +52,6 @@ public class ExportMSQDestination implements MSQDestination
this.resultFormat = resultFormat;
}
@JsonProperty("exportStorageProvider")
public ExportStorageProvider getExportStorageProvider()
{

View File

@ -33,7 +33,6 @@ import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.counters.ChannelCounters;
@ -130,7 +129,7 @@ public class ExportResultsFrameProcessor implements FrameProcessor<Object>
}
if (inputChannel.isFinished()) {
return ReturnOrAwait.returnObject(Unit.instance());
return ReturnOrAwait.returnObject(exportFilePath);
} else {
exportFrame(inputChannel.read());
return ReturnOrAwait.awaitAll(1);

View File

@ -23,11 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.OutputChannels;
import org.apache.druid.frame.processor.manager.ProcessorManagers;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -38,21 +40,24 @@ import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.stage.StageInputSlice;
import org.apache.druid.msq.kernel.ExtraInfoHolder;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.kernel.FrameProcessorFactory;
import org.apache.druid.msq.kernel.NilExtraInfoHolder;
import org.apache.druid.msq.kernel.ProcessorsAndChannels;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
@JsonTypeName("exportResults")
public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactory
public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory<Object, Object, Object>
{
private final String queryId;
private final ExportStorageProvider exportStorageProvider;
@ -101,7 +106,7 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor
}
@Override
public ProcessorsAndChannels<Object, Long> makeProcessors(
public ProcessorsAndChannels<Object, Object> makeProcessors(
StageDefinition stageDefinition,
int workerNumber,
List<InputSlice> inputSlices,
@ -120,7 +125,11 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor
);
if (inputSliceReader.numReadableInputs(slice) == 0) {
return new ProcessorsAndChannels<>(ProcessorManagers.none(), OutputChannels.none());
return new ProcessorsAndChannels<>(
ProcessorManagers.of(Sequences.<ExportResultsFrameProcessor>empty())
.withAccumulation(new ArrayList<String>(), (acc, file) -> acc),
OutputChannels.none()
);
}
ChannelCounters channelCounter = counters.channel(CounterNames.outputChannel());
@ -141,11 +150,50 @@ public class ExportResultsFrameProcessorFactory extends BaseFrameProcessorFactor
);
return new ProcessorsAndChannels<>(
ProcessorManagers.of(processors),
ProcessorManagers.of(processors)
.withAccumulation(new ArrayList<String>(), (acc, file) -> {
((ArrayList<String>) acc).add((String) file);
return acc;
}),
OutputChannels.none()
);
}
@Nullable
@Override
public TypeReference<Object> getResultTypeReference()
{
return new TypeReference<Object>() {};
}
@Override
public Object mergeAccumulatedResult(Object accumulated, Object otherAccumulated)
{
// If a worker does not return a list, fail the query
if (!(accumulated instanceof List)) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Expected a list result from worker, received [%s] instead. This might be due to workers having an older version.", accumulated.getClass());
}
if (!(otherAccumulated instanceof List)) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Expected a list result from worker, received [%s] instead. This might be due to workers having an older version.", otherAccumulated.getClass());
}
((List<String>) accumulated).addAll((List<String>) otherAccumulated);
return accumulated;
}
@Override
public ExtraInfoHolder makeExtraInfoHolder(@Nullable Object extra)
{
if (extra != null) {
throw new ISE("Expected null 'extra'");
}
return NilExtraInfoHolder.instance();
}
private static String getExportFilePath(String queryId, int workerNumber, int partitionNumber, ResultFormat exportFormat)
{
return StringUtils.format("%s-worker%s-partition%s.%s", queryId, workerNumber, partitionNumber, exportFormat.toString());

View File

@ -62,7 +62,7 @@ public class MSQExportTest extends MSQTestBase
.verifyResults();
Assert.assertEquals(
1,
2, // result file and manifest file
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
);
@ -93,16 +93,19 @@ public class MSQExportTest extends MSQTestBase
.verifyResults();
Assert.assertEquals(
1,
2,
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
);
File resultFile = new File(exportDir, "query-test-query-worker0-partition0.csv");
List<String> results = readResultsFromFile(resultFile);
Assert.assertEquals(
expectedFoo2FileContents(true),
results
);
verifyManifestFile(exportDir, ImmutableList.of(resultFile));
}
@Test
@ -129,7 +132,7 @@ public class MSQExportTest extends MSQTestBase
.verifyResults();
Assert.assertEquals(
expectedFooFileContents(false).size(),
expectedFooFileContents(false).size() + 1, // + 1 for the manifest file
Objects.requireNonNull(new File(exportDir.getAbsolutePath()).listFiles()).length
);
}
@ -141,14 +144,13 @@ public class MSQExportTest extends MSQTestBase
expectedResults.add("cnt,dim");
}
expectedResults.addAll(ImmutableList.of(
"1,",
"1,10.1",
"1,2",
"1,1",
"1,def",
"1,abc"
)
);
"1,",
"1,10.1",
"1,2",
"1,1",
"1,def",
"1,abc"
));
return expectedResults;
}
@ -173,4 +175,35 @@ public class MSQExportTest extends MSQTestBase
return results;
}
}
private void verifyManifestFile(File exportDir, List<File> resultFiles) throws IOException
{
final File manifestFile = new File(exportDir, ExportMetadataManager.MANIFEST_FILE);
try (
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(Files.newInputStream(manifestFile.toPath()), StringUtils.UTF8_STRING)
)
) {
for (File file : resultFiles) {
Assert.assertEquals(
StringUtils.format("file:%s", file.getAbsolutePath()),
bufferedReader.readLine()
);
}
Assert.assertNull(bufferedReader.readLine());
}
final File metaFile = new File(exportDir, ExportMetadataManager.META_FILE);
try (
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(Files.newInputStream(metaFile.toPath()), StringUtils.UTF8_STRING)
)
) {
Assert.assertEquals(
StringUtils.format("version: %s", ExportMetadataManager.MANIFEST_FILE_VERSION),
bufferedReader.readLine()
);
Assert.assertNull(bufferedReader.readLine());
}
}
}

View File

@ -159,8 +159,6 @@ import org.apache.druid.sql.SqlQueryPlus;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.export.TestExportStorageConnector;
import org.apache.druid.sql.calcite.export.TestExportStorageConnectorProvider;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.external.HttpOperatorConversion;
@ -318,7 +316,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
protected SqlStatementFactory sqlStatementFactory;
protected AuthorizerMapper authorizerMapper;
private IndexIO indexIO;
protected TestExportStorageConnectorProvider exportStorageConnectorProvider = new TestExportStorageConnectorProvider();
// Contains the metadata of loaded segments
protected List<ImmutableSegmentLoadInfo> loadedSegmentsMetadata = new ArrayList<>();
// Mocks the return of data from data servers
@ -512,12 +509,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
.build();
objectMapper = setupObjectMapper(injector);
objectMapper.registerModule(
new SimpleModule(StorageConnector.class.getSimpleName())
.registerSubtypes(
new NamedType(TestExportStorageConnectorProvider.class, TestExportStorageConnector.TYPE_NAME)
)
);
objectMapper.registerModules(new StorageConnectorModule().getJacksonModules());
objectMapper.registerModules(sqlModule.getJacksonModules());

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.s3.S3InputSource;
import org.apache.druid.error.DruidException;
@ -43,6 +44,8 @@ import java.util.List;
public class S3ExportStorageProvider implements ExportStorageProvider
{
public static final String TYPE_NAME = S3InputSource.TYPE_KEY;
private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
@JsonProperty
private final String bucket;
@JsonProperty
@ -143,4 +146,10 @@ public class S3ExportStorageProvider implements ExportStorageProvider
{
return new CloudObjectLocation(bucket, prefix).toUri(S3StorageDruidModule.SCHEME).toString();
}
@Override
public String getFilePathForManifest(String fileName)
{
return new CloudObjectLocation(bucket, JOINER.join(prefix, fileName)).toUri(S3StorageDruidModule.SCHEME).toString();
}
}

View File

@ -57,4 +57,13 @@ public class S3ExportStorageProviderTest
() -> S3ExportStorageProvider.validateS3Prefix(validPrefixes, "bucket-name", "validPath123")
);
}
@Test
public void testExportManifestFilePath()
{
Assert.assertEquals(
"s3://export-bucket/export/table/file1",
new S3ExportStorageProvider("export-bucket", "export/table").getFilePathForManifest("file1")
);
}
}

View File

@ -31,4 +31,6 @@ public interface ExportStorageProvider extends Provider<StorageConnector>
* Return a URI representation of the base path. This is used to be used for logging and error messages.
*/
String getBasePath();
String getFilePathForManifest(String fileName);
}

View File

@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.storage.StorageConfig;
import org.apache.druid.storage.StorageConnector;
@ -83,6 +84,13 @@ public class LocalFileExportStorageProvider implements ExportStorageProvider
return exportPath;
}
@Override
public String getFilePathForManifest(String fileName)
{
final File exportFile = new File(exportPath, fileName);
return StringUtils.format("file:%s", exportFile.toPath().normalize());
}
@Override
public boolean equals(Object o)
{

View File

@ -90,4 +90,17 @@ public class LocalFileExportStorageProviderTest
() -> LocalFileExportStorageProvider.validateAndGetPath("/base", "/base1")
);
}
@Test
public void testExportManifestFilePath()
{
Assert.assertEquals(
"file:/base/path1/file1",
new LocalFileExportStorageProvider("/base/path1").getFilePathForManifest("file1")
);
Assert.assertEquals(
"file:/base/path1/file1",
new LocalFileExportStorageProvider("/base/../base/path1").getFilePathForManifest("file1")
);
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
@ -33,14 +34,13 @@ import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.export.TestExportModule;
import org.apache.druid.sql.calcite.export.TestExportStorageConnector;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.destination.ExportDestination;
import org.apache.druid.sql.http.SqlParameter;
import org.apache.druid.storage.StorageConfig;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.local.LocalFileExportStorageProvider;
import org.apache.druid.storage.local.LocalFileStorageConnectorProvider;
import org.hamcrest.CoreMatchers;
@ -57,7 +57,24 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
public void configureGuice(DruidInjectorBuilder builder)
{
super.configureGuice(builder);
builder.addModule(new TestExportModule());
builder.addModule(
new DruidModule()
{
@Override
public void configure(Binder binder)
{
}
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(StorageConnectorProvider.class.getSimpleName()).registerSubtypes(
new NamedType(LocalFileExportStorageProvider.class, CalciteTests.FORBIDDEN_DESTINATION)
)
);
}
});
builder.addModule(new DruidModule()
{
@Override
@ -84,10 +101,10 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
public void testReplaceIntoExtern()
{
testIngestionQuery()
.sql(StringUtils.format("REPLACE INTO EXTERN(%s(basePath => 'export')) "
.sql(StringUtils.format("REPLACE INTO EXTERN(%s(exportPath => 'export')) "
+ "AS CSV "
+ "OVERWRITE ALL "
+ "SELECT dim2 FROM foo", TestExportStorageConnector.TYPE_NAME))
+ "SELECT dim2 FROM foo", LocalFileExportStorageProvider.TYPE_NAME))
.expectQuery(
Druids.newScanQueryBuilder()
.dataSource(
@ -99,7 +116,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
.legacy(false)
.build()
)
.expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME))
.expectResources(dataSourceRead("foo"), externalWrite(LocalFileExportStorageProvider.TYPE_NAME))
.expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build())
.verify();
}
@ -108,10 +125,10 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
public void testReplaceIntoExternShouldThrowUnsupportedException()
{
testIngestionQuery()
.sql(StringUtils.format("REPLACE INTO EXTERN(%s(basePath => 'export')) "
.sql(StringUtils.format("REPLACE INTO EXTERN(%s(exportPath => 'export')) "
+ "AS CSV "
+ "OVERWRITE ALL "
+ "SELECT dim2 FROM foo", TestExportStorageConnector.TYPE_NAME))
+ "SELECT dim2 FROM foo", LocalFileExportStorageProvider.TYPE_NAME))
.expectValidationError(
CoreMatchers.allOf(
CoreMatchers.instanceOf(DruidException.class),
@ -145,10 +162,10 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
public void testExportWithPartitionedBy()
{
testIngestionQuery()
.sql(StringUtils.format("INSERT INTO EXTERN(%s()) "
.sql(StringUtils.format("INSERT INTO EXTERN(%s(exportPath=>'/tmp/export')) "
+ "AS CSV "
+ "SELECT dim2 FROM foo "
+ "PARTITIONED BY ALL", TestExportStorageConnector.TYPE_NAME))
+ "PARTITIONED BY ALL", LocalFileStorageConnectorProvider.TYPE_NAME))
.expectValidationError(
DruidException.class,
"Export statements do not support a PARTITIONED BY or CLUSTERED BY clause."
@ -160,9 +177,9 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
public void testInsertIntoExtern()
{
testIngestionQuery()
.sql(StringUtils.format("INSERT INTO EXTERN(%s()) "
.sql(StringUtils.format("INSERT INTO EXTERN(%s(exportPath=>'/tmp/export')) "
+ "AS CSV "
+ "SELECT dim2 FROM foo", TestExportStorageConnector.TYPE_NAME))
+ "SELECT dim2 FROM foo", LocalFileStorageConnectorProvider.TYPE_NAME))
.expectQuery(
Druids.newScanQueryBuilder()
.dataSource(
@ -174,7 +191,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
.legacy(false)
.build()
)
.expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME))
.expectResources(dataSourceRead("foo"), externalWrite(LocalFileStorageConnectorProvider.TYPE_NAME))
.expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build())
.verify();
}
@ -184,9 +201,9 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
public void testInsertIntoExternParameterized()
{
testIngestionQuery()
.sql(StringUtils.format("INSERT INTO EXTERN(%s()) "
.sql(StringUtils.format("INSERT INTO EXTERN(%s(exportPath=>'/tmp/export')) "
+ "AS CSV "
+ "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME))
+ "SELECT dim2 FROM foo WHERE dim2=?", LocalFileStorageConnectorProvider.TYPE_NAME))
.parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val")))
.expectQuery(
Druids.newScanQueryBuilder()
@ -200,7 +217,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
.legacy(false)
.build()
)
.expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME))
.expectResources(dataSourceRead("foo"), externalWrite(LocalFileStorageConnectorProvider.TYPE_NAME))
.expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build())
.verify();
}
@ -211,9 +228,9 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
public void testReplaceIntoExternParameterized()
{
testIngestionQuery()
.sql(StringUtils.format("REPLACE INTO EXTERN(%s()) "
.sql(StringUtils.format("REPLACE INTO EXTERN(%s(exportPath=>'/tmp/export')) "
+ "AS CSV "
+ "SELECT dim2 FROM foo WHERE dim2=?", TestExportStorageConnector.TYPE_NAME))
+ "SELECT dim2 FROM foo WHERE dim2=?", LocalFileStorageConnectorProvider.TYPE_NAME))
.parameters(Collections.singletonList(new SqlParameter(SqlType.VARCHAR, "val")))
.expectQuery(
Druids.newScanQueryBuilder()
@ -227,7 +244,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
.legacy(false)
.build()
)
.expectResources(dataSourceRead("foo"), externalWrite(TestExportStorageConnector.TYPE_NAME))
.expectResources(dataSourceRead("foo"), externalWrite(LocalFileStorageConnectorProvider.TYPE_NAME))
.expectTarget(ExportDestination.TYPE_KEY, RowSignature.builder().add("dim2", ColumnType.STRING).build())
.verify();
}
@ -263,7 +280,7 @@ public class CalciteExportTest extends CalciteIngestionDmlTest
public void testWithForbiddenDestination()
{
testIngestionQuery()
.sql(StringUtils.format("insert into extern(%s()) as csv select __time, dim1 from foo", CalciteTests.FORBIDDEN_DESTINATION))
.sql(StringUtils.format("insert into extern(%s(exportPath=>'/tmp/export')) as csv select __time, dim1 from foo", CalciteTests.FORBIDDEN_DESTINATION))
.expectValidationError(ForbiddenException.class)
.verify();
}

View File

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.export;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.storage.StorageConnectorProvider;
import java.util.List;
public class TestExportModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(StorageConnectorProvider.class.getSimpleName())
.registerSubtypes(
new NamedType(TestExportStorageConnectorProvider.class, TestExportStorageConnector.TYPE_NAME),
new NamedType(TestExportStorageConnectorProvider.class, CalciteTests.FORBIDDEN_DESTINATION)
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.export;
import com.google.common.collect.ImmutableList;
import org.apache.druid.storage.StorageConnector;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
public class TestExportStorageConnector implements StorageConnector
{
public static final String TYPE_NAME = "testStorage";
private final ByteArrayOutputStream byteArrayOutputStream;
public TestExportStorageConnector()
{
this.byteArrayOutputStream = new ByteArrayOutputStream();
}
public ByteArrayOutputStream getByteArrayOutputStream()
{
return byteArrayOutputStream;
}
@Override
public boolean pathExists(String path)
{
return true;
}
@Override
public InputStream read(String path)
{
throw new UnsupportedOperationException();
}
@Override
public InputStream readRange(String path, long from, long size)
{
throw new UnsupportedOperationException();
}
@Override
public OutputStream write(String path)
{
return byteArrayOutputStream;
}
@Override
public void deleteFile(String path)
{
throw new UnsupportedOperationException();
}
@Override
public void deleteFiles(Iterable<String> paths)
{
throw new UnsupportedOperationException();
}
@Override
public void deleteRecursively(String path)
{
throw new UnsupportedOperationException();
}
@Override
public Iterator<String> listDir(String dirName)
{
return ImmutableList.<String>of().stream().iterator();
}
}

View File

@ -1,46 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.export;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.storage.StorageConnector;
public class TestExportStorageConnectorProvider implements ExportStorageProvider
{
private static final StorageConnector STORAGE_CONNECTOR = new TestExportStorageConnector();
@Override
public StorageConnector get()
{
return STORAGE_CONNECTOR;
}
@Override
public String getResourceType()
{
return "testExport";
}
@Override
public String getBasePath()
{
return "testExport";
}
}