add google as external storage for msq export (#16051)

Support for exporting msq results to gcs bucket. This is essentially copying the logic of s3 export for gs, originally done by @adarshsanjeev in this PR - #15689
This commit is contained in:
Parag Jain 2024-04-05 12:10:10 +05:30 committed by GitHub
parent 3ba878f21b
commit f55c9e58a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 335 additions and 3 deletions

View File

@ -150,6 +150,39 @@ The following runtime parameters must be configured to export into an S3 destina
| `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | | `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB | | `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |
##### GS
Export results to GCS by passing the function `google()` as an argument to the `EXTERN` function. Note that this requires the `druid-google-extensions`.
The `google()` function is a Druid function that configures the connection. Arguments for `google()` should be passed as named parameters with the value in single quotes like the following example:
```sql
INSERT INTO
EXTERN(
google(bucket => 'your_bucket', prefix => 'prefix/to/files')
)
AS CSV
SELECT
<column>
FROM <table>
```
Supported arguments for the function:
| Parameter | Required | Description | Default |
|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `bucket` | Yes | The GS bucket to which the files are exported to. The bucket and prefix combination should be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a |
| `prefix` | Yes | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. The bucket and prefix combination should be whitelisted in `druid.export.storage.google.allowedExportPaths`. | n/a |
The following runtime parameters must be configured to export into a GCS destination:
| Runtime Parameter | Required | Description | Default |
|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `druid.export.storage.google.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a |
| `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a |
| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB |
##### LOCAL ##### LOCAL
You can export to the local storage, which exports the results to the filesystem of the MSQ worker. You can export to the local storage, which exports the results to the filesystem of the MSQ worker.

View File

@ -50,7 +50,7 @@ import java.util.Set;
public class GoogleCloudStorageInputSource extends CloudObjectInputSource public class GoogleCloudStorageInputSource extends CloudObjectInputSource
{ {
static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME; public static final String TYPE_KEY = GoogleStorageDruidModule.SCHEME;
private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class); private static final Logger LOG = new Logger(GoogleCloudStorageInputSource.class);
private final GoogleStorage storage; private final GoogleStorage storage;

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google.output;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.HumanReadableBytes;
import javax.annotation.Nullable;
import java.util.List;
public class GoogleExportConfig
{
@JsonProperty("tempLocalDir")
private final String tempLocalDir;
@JsonProperty("chunkSize")
private final HumanReadableBytes chunkSize;
@JsonProperty("maxRetry")
private final Integer maxRetry;
@JsonProperty("allowedExportPaths")
private final List<String> allowedExportPaths;
@JsonCreator
public GoogleExportConfig(
@JsonProperty("tempLocalDir") final String tempLocalDir,
@JsonProperty("chunkSize") @Nullable final HumanReadableBytes chunkSize,
@JsonProperty("maxRetry") @Nullable final Integer maxRetry,
@JsonProperty("allowedExportPaths") final List<String> allowedExportPaths)
{
this.tempLocalDir = tempLocalDir;
this.chunkSize = chunkSize;
this.maxRetry = maxRetry;
this.allowedExportPaths = allowedExportPaths;
}
public String getTempLocalDir()
{
return tempLocalDir;
}
public HumanReadableBytes getChunkSize()
{
return chunkSize;
}
public Integer getMaxRetry()
{
return maxRetry;
}
public List<String> getAllowedExportPaths()
{
return allowedExportPaths;
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google.output;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.data.input.google.GoogleCloudStorageInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleStorageDruidModule;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.net.URI;
import java.util.List;
@JsonTypeName(GoogleExportStorageProvider.TYPE_NAME)
public class GoogleExportStorageProvider implements ExportStorageProvider
{
public static final String TYPE_NAME = GoogleCloudStorageInputSource.TYPE_KEY;
@JsonProperty
private final String bucket;
@JsonProperty
private final String prefix;
@JacksonInject
GoogleExportConfig googleExportConfig;
@JacksonInject
GoogleStorage googleStorage;
@JacksonInject
GoogleInputDataConfig googleInputDataConfig;
@JsonCreator
public GoogleExportStorageProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
@JsonProperty(value = "prefix", required = true) String prefix
)
{
this.bucket = bucket;
this.prefix = prefix;
}
@Override
public StorageConnector get()
{
final String tempDir = googleExportConfig.getTempLocalDir();
if (tempDir == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("The runtime property `druid.export.storage.google.tempLocalDir` must be configured for GCS export.");
}
final List<String> allowedExportPaths = googleExportConfig.getAllowedExportPaths();
if (allowedExportPaths == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build(
"The runtime property `druid.export.storage.google.allowedExportPaths` must be configured for GCS export.");
}
validatePrefix(allowedExportPaths, bucket, prefix);
final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig(
bucket,
prefix,
new File(tempDir),
googleExportConfig.getChunkSize(),
googleExportConfig.getMaxRetry()
);
return new GoogleStorageConnector(googleOutputConfig, googleStorage, googleInputDataConfig);
}
@VisibleForTesting
static void validatePrefix(@NotNull final List<String> allowedExportPaths, final String bucket, final String prefix)
{
final URI providedUri = new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS);
for (final String path : allowedExportPaths) {
final URI allowedUri = URI.create(path);
if (validateUri(allowedUri, providedUri)) {
return;
}
}
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build("None of the allowed prefixes matched the input path [%s]. "
+ "Please reach out to the cluster admin for the whitelisted paths for export. "
+ "The paths are controlled via the property `druid.export.storage.google.allowedExportPaths`.",
providedUri);
}
private static boolean validateUri(final URI allowedUri, final URI providedUri)
{
if (!allowedUri.getHost().equals(providedUri.getHost())) {
return false;
}
final String allowedPath = StringUtils.maybeAppendTrailingSlash(allowedUri.getPath());
final String providedPath = StringUtils.maybeAppendTrailingSlash(providedUri.getPath());
return providedPath.startsWith(allowedPath);
}
@JsonProperty("bucket")
public String getBucket()
{
return bucket;
}
@JsonProperty("prefix")
public String getPrefix()
{
return prefix;
}
@Override
@JsonIgnore
public String getResourceType()
{
return TYPE_NAME;
}
@Override
@JsonIgnore
public String getBasePath()
{
return new CloudObjectLocation(bucket, prefix).toUri(GoogleStorageDruidModule.SCHEME_GS).toString();
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.storage.google.output;
import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder; import com.google.inject.Binder;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.initialization.DruidModule; import org.apache.druid.initialization.DruidModule;
import java.util.Collections; import java.util.Collections;
@ -33,12 +34,15 @@ public class GoogleStorageConnectorModule implements DruidModule
public List<? extends Module> getJacksonModules() public List<? extends Module> getJacksonModules()
{ {
return Collections.singletonList( return Collections.singletonList(
new SimpleModule(this.getClass().getSimpleName()).registerSubtypes(GoogleStorageConnectorProvider.class)); new SimpleModule(this.getClass().getSimpleName())
.registerSubtypes(GoogleStorageConnectorProvider.class)
.registerSubtypes(GoogleExportStorageProvider.class)
);
} }
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.export.storage.google", GoogleExportConfig.class);
} }
} }

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google.output;
import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.storage.StorageConnector;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
public class GoogleExportStorageProviderTest
{
private final List<String> validPrefixes = ImmutableList.of(
"gs://bucket-name/validPath1",
"gs://bucket-name/validPath2"
);
@Test
public void testGoogleExportStorageProvider()
{
GoogleExportStorageProvider googleExportStorageProvider = new GoogleExportStorageProvider("bucket-name", "validPath1");
googleExportStorageProvider.googleExportConfig = new GoogleExportConfig("tempLocalDir", null, null, validPrefixes);
StorageConnector storageConnector = googleExportStorageProvider.get();
Assert.assertNotNull(storageConnector);
Assert.assertTrue(storageConnector instanceof GoogleStorageConnector);
Assert.assertEquals("gs://bucket-name/validPath1", googleExportStorageProvider.getBasePath());
}
@Test
public void testValidatePaths()
{
GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/");
GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1");
GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/validSubPath/");
GoogleExportStorageProvider.validatePrefix(ImmutableList.of("gs://bucket-name"), "bucket-name", "");
GoogleExportStorageProvider.validatePrefix(ImmutableList.of("gs://bucket-name"), "bucket-name", "validPath");
GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath1/../validPath2/");
Assert.assertThrows(
DruidException.class,
() -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "incorrect-bucket", "validPath1/")
);
Assert.assertThrows(
DruidException.class,
() -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "invalidPath1")
);
Assert.assertThrows(
DruidException.class,
() -> GoogleExportStorageProvider.validatePrefix(validPrefixes, "bucket-name", "validPath123")
);
}
}

View File

@ -21,6 +21,7 @@
1M 1M
100MiB 100MiB
32-bit 32-bit
4MiB
500MiB 500MiB
64-bit 64-bit
ACL ACL