From b7c661b80131ac7ec2485438a05f21aa4e63d5ad Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 29 Oct 2024 13:36:59 +0530 Subject: [PATCH] Make tempStorageDirectory configuration optional and rely on task dir instead (#17015) Currently, durable storage and export both require configuring a temporary directory to be used using druid.export.storage..tempLocalDir and druid.msq.intermediate.storage.tempDir. Tasks on middle manager already have a configured temporary directory. This PR aims to reduce the configuration required by using the task directory as a default if it is not explicitly configured, thus reducing the number of configs that a user has to set. Please note that preference would be given to the user configured, druid.*.storage.temp*Dir, on the tasks. If that is not configured, we then use the configured temporary directory. Overlord and brokers also require storage connector configurations (for the durableStorageCleanerOverlordDuty and to fetch results of async queries respectively), but do not have a default temporary task directory. The configuration is still required for these services. --- docs/multi-stage-query/reference.md | 14 +++---- .../azure/output/AzureOutputConfig.java | 17 +++++++- .../output/AzureStorageConnectorProvider.java | 8 ++-- .../azure/output/AzureOutputConfigTest.java | 3 +- .../azure/output/AzureOutputSerdeTest.java | 12 ------ .../AzureStorageConnectorProviderTest.java | 41 +++++++++++-------- .../output/GoogleExportStorageProvider.java | 13 +++--- .../google/output/GoogleOutputConfig.java | 9 +++- .../GoogleStorageConnectorProvider.java | 7 ++-- .../GoogleExportStorageProviderTest.java | 6 ++- .../GoogleStorageConnectorProviderTest.java | 10 ++--- .../druid/msq/exec/ControllerContext.java | 10 +++++ .../apache/druid/msq/exec/ControllerImpl.java | 28 +++---------- .../druid/msq/exec/ExportMetadataManager.java | 7 +++- .../msq/guice/MSQDurableStorageModule.java | 12 +----- .../indexing/IndexerControllerContext.java | 16 +++++++- .../msq/indexing/IndexerWorkerContext.java | 9 +++- .../cleaner/DurableStorageCleaner.java | 5 ++- .../ExportResultsFrameProcessorFactory.java | 2 +- .../sql/resources/SqlStatementResource.java | 5 ++- .../indexing/DurableStorageCleanerTest.java | 5 ++- .../indexing/IndexerWorkerContextTest.java | 6 +++ .../SqlMSQStatementResourcePostTest.java | 5 +-- .../resources/SqlStatementResourceTest.java | 5 +-- .../msq/test/MSQTestControllerContext.java | 9 ++++ .../storage/s3/output/S3ExportConfig.java | 3 +- .../s3/output/S3ExportStorageProvider.java | 14 +++---- .../storage/s3/output/S3OutputConfig.java | 10 ++++- .../s3/output/S3StorageConnectorProvider.java | 8 ++-- .../s3/S3StorageConnectorProviderTest.java | 16 +++----- .../storage/s3/output/S3OutputSerdeTest.java | 14 ------- .../druid/storage/ExportStorageProvider.java | 7 +++- .../storage/StorageConnectorProvider.java | 15 ++++++- .../local/LocalFileExportStorageProvider.java | 2 +- .../LocalFileStorageConnectorProvider.java | 2 +- .../storage/StorageConnectorModuleTest.java | 5 ++- .../local/LocalFileStorageConnectorTest.java | 20 ++++----- 37 files changed, 213 insertions(+), 167 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 50f07ff80b4..6facbaedcb3 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -155,8 +155,8 @@ The following runtime parameters must be configured to export into an S3 destina | Runtime Parameter | Required | Description | Default | |----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----| -| `druid.export.storage.s3.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a | | `druid.export.storage.s3.allowedExportPaths` | Yes | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a | +| `druid.export.storage.s3.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a | | `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | | `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB | @@ -186,12 +186,12 @@ Supported arguments for the function: The following runtime parameters must be configured to export into a GCS destination: -| Runtime Parameter | Required | Description | Default | -|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| -| `druid.export.storage.google.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a | +| Runtime Parameter | Required | Description | Default | +|--------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| | `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a | -| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 | -| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB | +| `druid.export.storage.google.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a | +| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 | +| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB | ##### LOCAL @@ -531,7 +531,7 @@ Common properties to configure the behavior of durable storage |--|--|--| |`druid.msq.intermediate.storage.enable` | Yes | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md). | false | |`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3, `azure` for Azure and `google` for Google | n/a | -|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data | n/a | +|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data. If the property is not configured on the indexer or middle manager, it defaults to using the task temporary directory. | n/a | |`druid.msq.intermediate.storage.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 | |`druid.msq.intermediate.storage.chunkSize` | No | Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| 100MiB | diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java index 7af9c856c5f..004e79a66a6 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureOutputConfig.java @@ -42,6 +42,7 @@ public class AzureOutputConfig @JsonProperty private final String prefix; + @Nullable @JsonProperty private final File tempDir; @@ -64,7 +65,7 @@ public class AzureOutputConfig public AzureOutputConfig( @JsonProperty(value = "container", required = true) String container, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -77,7 +78,6 @@ public class AzureOutputConfig validateFields(); } - public String getContainer() { return container; @@ -88,6 +88,7 @@ public class AzureOutputConfig return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -103,6 +104,11 @@ public class AzureOutputConfig return maxRetry; } + public AzureOutputConfig withTempDir(File tempDir) + { + return new AzureOutputConfig(container, prefix, tempDir, chunkSize, maxRetry); + } + private void validateFields() { if (chunkSize.getBytes() < AZURE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > AZURE_MAX_CHUNK_SIZE_BYTES) { @@ -113,6 +119,13 @@ public class AzureOutputConfig AZURE_MAX_CHUNK_SIZE_BYTES ); } + } + + public void validateTempDirectory() + { + if (tempDir == null) { + throw DruidException.defensive("The runtime property `druid.msq.intermediate.storage.tempDir` must be configured."); + } try { FileUtils.mkdirp(tempDir); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java index 79be724c17f..0f2cdb47d44 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProvider.java @@ -45,7 +45,7 @@ public class AzureStorageConnectorProvider extends AzureOutputConfig implements public AzureStorageConnectorProvider( @JsonProperty(value = "container", required = true) String container, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -54,8 +54,10 @@ public class AzureStorageConnectorProvider extends AzureOutputConfig implements } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(final File defaultTempDir) { - return new AzureStorageConnector(this, azureStorage); + AzureOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; + config.validateTempDirectory(); + return new AzureStorageConnector(config, azureStorage); } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java index 058887316ec..fca486a4246 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputConfigTest.java @@ -57,10 +57,9 @@ public class AzureOutputConfigTest throw new ISE("Unable to change the permission of temp folder for %s", this.getClass().getName()); } - //noinspection ResultOfObjectAllocationIgnored assertThrows( DruidException.class, - () -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT) + () -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT).validateTempDirectory() ); } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java index aea5232217a..1ece63ccfcc 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureOutputSerdeTest.java @@ -86,18 +86,6 @@ public class AzureOutputSerdeTest assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class)); } - @Test - public void noTempDir() - { - String json = jsonStringReadyForAssert("{\n" - + " \"prefix\": \"abc\",\n" - + " \"container\": \"TEST\",\n" - + " \"chunkSize\":104857600,\n" - + " \"maxRetry\": 2\n" - + "}\n"); - assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class)); - } - @Test public void leastArguments() throws JsonProcessingException { diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java index 0b76f02af29..74141519bf9 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java @@ -25,10 +25,9 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.ProvisionException; import com.google.inject.name.Names; +import org.apache.druid.error.DruidException; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.azure.AzureStorage; @@ -59,7 +58,7 @@ public class AzureStorageConnectorProviderTest StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties); assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider); - assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get()); + assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.createStorageConnector(new File("/tmp"))); assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer()); assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix()); assertEquals(new File("/tmp"), @@ -107,30 +106,36 @@ public class AzureStorageConnectorProviderTest properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); assertThrows( - ProvisionException.class, - () -> getStorageConnectorProvider(properties), - "Missing required creator property 'tempDir'" + DruidException.class, + () -> getStorageConnectorProvider(properties).createStorageConnector(null), + "The runtime property `druid.msq.intermediate.storage.tempDir` must be configured." ); } + @Test + public void createAzureStorageFactoryWithMissingTempDirButProvidedDuringRuntime() + { + + final Properties properties = new Properties(); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure"); + properties.setProperty(CUSTOM_NAMESPACE + ".container", "container"); + properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); + + getStorageConnectorProvider(properties).createStorageConnector(new File("/tmp")); + } + private StorageConnectorProvider getStorageConnectorProvider(Properties properties) { StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add( new AzureStorageDruidModule(), new StorageConnectorModule(), new AzureStorageConnectorModule(), - binder -> { - JsonConfigProvider.bind( - binder, - CUSTOM_NAMESPACE, - StorageConnectorProvider.class, - Names.named(CUSTOM_NAMESPACE) - ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); - } + binder -> JsonConfigProvider.bind( + binder, + CUSTOM_NAMESPACE, + StorageConnectorProvider.class, + Names.named(CUSTOM_NAMESPACE) + ) ).withProperties(properties); Injector injector = startupInjectorBuilder.build(); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java index 8d0c6b50b31..eba6545272d 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleExportStorageProvider.java @@ -70,13 +70,12 @@ public class GoogleExportStorageProvider implements ExportStorageProvider } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { - final String tempDir = googleExportConfig.getTempLocalDir(); - if (tempDir == null) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("The runtime property `druid.export.storage.google.tempLocalDir` must be configured for GCS export."); + final String exportConfigTempDir = googleExportConfig.getTempLocalDir(); + final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir; + if (tempDirFile == null) { + throw DruidException.defensive("Couldn't find temporary directory for export."); } final List allowedExportPaths = googleExportConfig.getAllowedExportPaths(); if (allowedExportPaths == null) { @@ -89,7 +88,7 @@ public class GoogleExportStorageProvider implements ExportStorageProvider final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig( bucket, prefix, - new File(tempDir), + tempDirFile, googleExportConfig.getChunkSize(), googleExportConfig.getMaxRetry() ); diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java index c9c78151ae9..4e42c6d274a 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleOutputConfig.java @@ -37,6 +37,7 @@ public class GoogleOutputConfig @JsonProperty private final String prefix; + @Nullable @JsonProperty private final File tempDir; @@ -58,7 +59,7 @@ public class GoogleOutputConfig public GoogleOutputConfig( final String bucket, final String prefix, - final File tempDir, + @Nullable final File tempDir, @Nullable final HumanReadableBytes chunkSize, @Nullable final Integer maxRetry ) @@ -82,6 +83,7 @@ public class GoogleOutputConfig return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -97,6 +99,11 @@ public class GoogleOutputConfig return maxRetry; } + public GoogleOutputConfig withTempDir(File tempDir) + { + return new GoogleOutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry); + } + private void validateFields() { if (chunkSize.getBytes() < GOOGLE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > GOOGLE_MAX_CHUNK_SIZE_BYTES) { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java index 49856a9c1ef..ac5c6a3c964 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProvider.java @@ -47,7 +47,7 @@ public class GoogleStorageConnectorProvider extends GoogleOutputConfig implement public GoogleStorageConnectorProvider( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize, @JsonProperty(value = "maxRetry") @Nullable Integer maxRetry ) @@ -56,8 +56,9 @@ public class GoogleStorageConnectorProvider extends GoogleOutputConfig implement } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File defaultTempDir) { - return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig); + GoogleOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; + return new GoogleStorageConnector(config, googleStorage, googleInputDataConfig); } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java index e40846a848d..8b9c8691849 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleExportStorageProviderTest.java @@ -21,14 +21,18 @@ package org.apache.druid.storage.google.output; import com.google.common.collect.ImmutableList; import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.storage.StorageConnector; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.util.List; public class GoogleExportStorageProviderTest { + private final File tempDir = FileUtils.createTempDir(); + private final List validPrefixes = ImmutableList.of( "gs://bucket-name/validPath1", "gs://bucket-name/validPath2" @@ -39,7 +43,7 @@ public class GoogleExportStorageProviderTest { GoogleExportStorageProvider googleExportStorageProvider = new GoogleExportStorageProvider("bucket-name", "validPath1"); googleExportStorageProvider.googleExportConfig = new GoogleExportConfig("tempLocalDir", null, null, validPrefixes); - StorageConnector storageConnector = googleExportStorageProvider.get(); + StorageConnector storageConnector = googleExportStorageProvider.createStorageConnector(tempDir); Assert.assertNotNull(storageConnector); Assert.assertTrue(storageConnector instanceof GoogleStorageConnector); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java index df6c66e84c3..a264ead94a1 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/output/GoogleStorageConnectorProviderTest.java @@ -26,9 +26,8 @@ import com.google.inject.Key; import com.google.inject.ProvisionException; import com.google.inject.name.Names; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; -import org.apache.druid.storage.StorageConnector; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.google.GoogleInputDataConfig; @@ -44,6 +43,7 @@ import java.util.Properties; public class GoogleStorageConnectorProviderTest { private static final String CUSTOM_NAMESPACE = "custom"; + private final File tempDir = FileUtils.createTempDir(); @Test public void createGoogleStorageFactoryWithRequiredProperties() @@ -57,7 +57,7 @@ public class GoogleStorageConnectorProviderTest StorageConnectorProvider googleStorageConnectorProvider = getStorageConnectorProvider(properties); Assert.assertTrue(googleStorageConnectorProvider instanceof GoogleStorageConnectorProvider); - Assert.assertTrue(googleStorageConnectorProvider.get() instanceof GoogleStorageConnector); + Assert.assertTrue(googleStorageConnectorProvider.createStorageConnector(tempDir) instanceof GoogleStorageConnector); Assert.assertEquals("bucket", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getBucket()); Assert.assertEquals("prefix", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getPrefix()); Assert.assertEquals(new File("/tmp"), ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getTempDir()); @@ -124,10 +124,6 @@ public class GoogleStorageConnectorProviderTest StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE) ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); } ).withProperties(properties); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java index 687660ba750..7f5e4a0a2b0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java @@ -35,6 +35,8 @@ import org.apache.druid.msq.querykit.QueryKitSpec; import org.apache.druid.query.Query; import org.apache.druid.server.DruidNode; +import java.io.File; + /** * Context used by multi-stage query controllers. Useful because it allows test fixtures to provide their own * implementations. @@ -104,6 +106,14 @@ public interface ControllerContext WorkerFailureListener workerFailureListener ); + /** + * Fetch a directory for temporary outputs + */ + default File taskTempDir() + { + throw new UnsupportedOperationException(); + } + /** * Client for communicating with workers. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 0043aaff294..0615ba802bf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -573,7 +573,7 @@ public class ControllerImpl implements Controller final QueryDefinition queryDef = makeQueryDefinition( context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig), querySpec, - context.jsonMapper(), + context, resultsContext ); @@ -1566,26 +1566,7 @@ public class ControllerImpl implements Controller } else if (MSQControllerTask.isExport(querySpec)) { // Write manifest file. ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); - ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider()); - - final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); - //noinspection unchecked - - - Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId); - if (!(resultObjectForStage instanceof List)) { - // This might occur if all workers are running on an older version. We are not able to write a manifest file in this case. - log.warn("Was unable to create manifest file due to "); - return; - } - @SuppressWarnings("unchecked") - List exportedFiles = (List) queryKernel.getResultObjectForStage(finalStageId); - log.info("Query [%s] exported %d files.", queryDef.getQueryId(), exportedFiles.size()); - exportMetadataManager.writeMetadata(exportedFiles); - } else if (MSQControllerTask.isExport(querySpec)) { - // Write manifest file. - ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination(); - ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider()); + ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider(), context.taskTempDir()); final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber()); //noinspection unchecked @@ -1734,10 +1715,11 @@ public class ControllerImpl implements Controller private static QueryDefinition makeQueryDefinition( final QueryKitSpec queryKitSpec, final MSQSpec querySpec, - final ObjectMapper jsonMapper, + final ControllerContext controllerContext, final ResultsContext resultsContext ) { + final ObjectMapper jsonMapper = controllerContext.jsonMapper(); final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); final ColumnMappings columnMappings = querySpec.getColumnMappings(); final Query queryToPlan; @@ -1855,7 +1837,7 @@ public class ControllerImpl implements Controller try { // Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export. - Iterator filesIterator = exportStorageProvider.get().listDir(""); + Iterator filesIterator = exportStorageProvider.createStorageConnector(controllerContext.taskTempDir()).listDir(""); if (filesIterator.hasNext()) { throw DruidException.forPersona(DruidException.Persona.USER) .ofCategory(DruidException.Category.RUNTIME_FAILURE) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java index 3b9d0296de5..00c5d7799de 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExportMetadataManager.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.ExportStorageProvider; import org.apache.druid.storage.StorageConnector; +import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; @@ -42,15 +43,17 @@ public class ExportMetadataManager public static final int MANIFEST_FILE_VERSION = 1; private static final Logger log = new Logger(ExportMetadataManager.class); private final ExportStorageProvider exportStorageProvider; + private final File tmpDir; - public ExportMetadataManager(final ExportStorageProvider exportStorageProvider) + public ExportMetadataManager(final ExportStorageProvider exportStorageProvider, final File tmpDir) { this.exportStorageProvider = exportStorageProvider; + this.tmpDir = tmpDir; } public void writeMetadata(List exportedFiles) throws IOException { - final StorageConnector storageConnector = exportStorageProvider.get(); + final StorageConnector storageConnector = exportStorageProvider.createStorageConnector(tmpDir); log.info("Writing manifest file at location [%s]", exportStorageProvider.getBasePath()); if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java index 7139377495a..55d60a6473e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQDurableStorageModule.java @@ -27,14 +27,12 @@ import com.google.inject.Key; import com.google.inject.multibindings.Multibinder; import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.overlord.duty.OverlordDuty; import org.apache.druid.initialization.DruidModule; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig; import org.apache.druid.storage.NilStorageConnector; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorProvider; import java.util.List; @@ -84,10 +82,6 @@ public class MSQDurableStorageModule implements DruidModule MultiStageQuery.class ); - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) - .toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)) - .in(LazySingleton.class); - if (nodeRoles.contains(NodeRole.OVERLORD)) { JsonConfigProvider.bind( binder, @@ -99,11 +93,9 @@ public class MSQDurableStorageModule implements DruidModule .addBinding() .to(DurableStorageCleaner.class); } - } else if (nodeRoles.contains(NodeRole.BROKER)) { - // bind with nil implementation so that configs are not required during service startups of broker since SQLStatementResource uses it. - binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)).toInstance(NilStorageConnector.getInstance()); } else { - // do nothing + // bind with nil implementation so that configs are not required during service startups. + binder.bind(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)).toInstance(tempDir -> NilStorageConnector.getInstance()); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index ca93c673a4b..a4d7778a841 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -40,6 +40,7 @@ import org.apache.druid.msq.exec.SegmentSource; import org.apache.druid.msq.exec.WorkerClient; import org.apache.druid.msq.exec.WorkerFailureListener; import org.apache.druid.msq.exec.WorkerManager; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.client.ControllerChatHandler; import org.apache.druid.msq.indexing.client.IndexerWorkerClient; import org.apache.druid.msq.indexing.error.MSQException; @@ -59,7 +60,10 @@ import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.segment.realtime.ChatHandler; import org.apache.druid.server.DruidNode; import org.apache.druid.server.lookup.cache.LookupLoadingSpec; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; +import java.io.File; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -91,12 +95,16 @@ public class IndexerControllerContext implements ControllerContext { this.task = task; this.toolbox = toolbox; - this.injector = injector; this.clientFactory = clientFactory; this.overlordClient = overlordClient; this.metricBuilder = new ServiceMetricEvent.Builder(); this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class); IndexTaskUtils.setTaskDimensions(metricBuilder, task); + final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)); + final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir()); + this.injector = injector.createChildInjector( + binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toInstance(storageConnector)); } @Override @@ -212,6 +220,12 @@ public class IndexerControllerContext implements ControllerContext ); } + @Override + public File taskTempDir() + { + return toolbox.getIndexingTmpDir(); + } + @Override public QueryKitSpec makeQueryKitSpec( final QueryKit> queryKit, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java index fbb0bff9556..a26eded4322 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java @@ -43,6 +43,7 @@ import org.apache.druid.msq.exec.WorkerClient; import org.apache.druid.msq.exec.WorkerContext; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.exec.WorkerStorageParameters; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.msq.indexing.client.IndexerControllerClient; import org.apache.druid.msq.indexing.client.IndexerWorkerClient; import org.apache.druid.msq.indexing.client.WorkerChatHandler; @@ -61,6 +62,8 @@ import org.apache.druid.rpc.indexing.SpecificTaskServiceLocator; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.server.DruidNode; +import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import java.io.File; import java.util.concurrent.ExecutorService; @@ -106,7 +109,6 @@ public class IndexerWorkerContext implements WorkerContext { this.task = task; this.toolbox = toolbox; - this.injector = injector; this.overlordClient = overlordClient; this.indexIO = indexIO; this.dataSegmentProvider = dataSegmentProvider; @@ -121,6 +123,11 @@ public class IndexerWorkerContext implements WorkerContext IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES ); this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext); + final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)); + final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir()); + this.injector = injector.createChildInjector( + binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)) + .toInstance(storageConnector)); } public static IndexerWorkerContext createProductionInstance( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java index 630499bdd87..6b45273fa72 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/cleaner/DurableStorageCleaner.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.duty.OverlordDuty; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import java.util.HashSet; import java.util.Iterator; @@ -55,12 +56,12 @@ public class DurableStorageCleaner implements OverlordDuty @Inject public DurableStorageCleaner( final DurableStorageCleanerConfig config, - final @MultiStageQuery StorageConnector storageConnector, + final @MultiStageQuery StorageConnectorProvider storageConnectorProvider, @JacksonInject final Provider taskMasterProvider ) { this.config = config; - this.storageConnector = storageConnector; + this.storageConnector = storageConnectorProvider.createStorageConnector(null); this.taskMasterProvider = taskMasterProvider; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java index fe2598a9514..d4ba2120c9c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java @@ -160,7 +160,7 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory readableInput.getChannel(), exportFormat, readableInput.getChannelFrameReader(), - exportStorageProvider.get(), + exportStorageProvider.createStorageConnector(frameContext.tempDir()), frameContext.jsonMapper(), channelCounter, getExportFilePath(queryId, workerNumber, readableInput.getStagePartition().getPartitionNumber(), exportFormat), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java index f5662280478..c92bfa955fb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/resources/SqlStatementResource.java @@ -91,6 +91,7 @@ import org.apache.druid.sql.http.SqlQuery; import org.apache.druid.sql.http.SqlResource; import org.apache.druid.storage.NilStorageConnector; import org.apache.druid.storage.StorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import javax.servlet.http.HttpServletRequest; @@ -134,14 +135,14 @@ public class SqlStatementResource final @MultiStageQuery SqlStatementFactory msqSqlStatementFactory, final ObjectMapper jsonMapper, final OverlordClient overlordClient, - final @MultiStageQuery StorageConnector storageConnector, + final @MultiStageQuery StorageConnectorProvider storageConnectorProvider, final AuthorizerMapper authorizerMapper ) { this.msqSqlStatementFactory = msqSqlStatementFactory; this.jsonMapper = jsonMapper; this.overlordClient = overlordClient; - this.storageConnector = storageConnector; + this.storageConnector = storageConnectorProvider.createStorageConnector(null); this.authorizerMapper = authorizerMapper; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java index 3da4adcbc3f..0ddf807d707 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/DurableStorageCleanerTest.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.duty.DutySchedule; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner; import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig; +import org.apache.druid.storage.NilStorageConnector; import org.apache.druid.storage.StorageConnector; import org.easymock.Capture; import org.easymock.EasyMock; @@ -59,7 +60,7 @@ public class DurableStorageCleanerTest durableStorageCleanerConfig.enabled = true; durableStorageCleaner = new DurableStorageCleaner( durableStorageCleanerConfig, - STORAGE_CONNECTOR, + s -> STORAGE_CONNECTOR, () -> TASK_MASTER ); } @@ -126,7 +127,7 @@ public class DurableStorageCleanerTest DurableStorageCleanerConfig cleanerConfig = new DurableStorageCleanerConfig(); cleanerConfig.delaySeconds = 10L; cleanerConfig.enabled = true; - DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, null, null); + DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, (temp) -> NilStorageConnector.getInstance(), null); DutySchedule schedule = durableStorageCleaner.getSchedule(); Assert.assertEquals(cleanerConfig.delaySeconds * 1000, schedule.getPeriodMillis()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java index 8de80cf109f..7ac389b9803 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java @@ -22,12 +22,16 @@ package org.apache.druid.msq.indexing; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.inject.Injector; +import com.google.inject.Key; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.msq.exec.Worker; +import org.apache.druid.msq.guice.MultiStageQuery; import org.apache.druid.rpc.ServiceLocation; import org.apache.druid.rpc.ServiceLocations; import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.storage.NilStorageConnector; +import org.apache.druid.storage.StorageConnectorProvider; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -45,6 +49,8 @@ public class IndexerWorkerContextTest final Injector injectorMock = Mockito.mock(Injector.class); Mockito.when(injectorMock.getInstance(SegmentCacheManagerFactory.class)) .thenReturn(Mockito.mock(SegmentCacheManagerFactory.class)); + Mockito.when(injectorMock.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class))) + .thenReturn(defaultTempDir -> NilStorageConnector.getInstance()); final MSQWorkerTask task = Mockito.mock(MSQWorkerTask.class, Mockito.withSettings().strictness(Strictness.STRICT_STUBS)); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java index cef3e00daa2..656086cef43 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlMSQStatementResourcePostTest.java @@ -54,7 +54,6 @@ import org.junit.jupiter.api.Test; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; - import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -76,7 +75,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase sqlStatementFactory, objectMapper, indexingServiceClient, - localFileStorageConnector, + s -> localFileStorageConnector, authorizerMapper ); } @@ -330,7 +329,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase sqlStatementFactory, objectMapper, indexingServiceClient, - NilStorageConnector.getInstance(), + s -> NilStorageConnector.getInstance(), authorizerMapper ); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java index a79edee53b8..40dcb303b1d 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java @@ -81,7 +81,6 @@ import org.apache.druid.sql.calcite.planner.ColumnMappings; import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.sql.http.ResultFormat; import org.apache.druid.sql.http.SqlResourceTest; -import org.apache.druid.storage.local.LocalFileStorageConnector; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpVersion; @@ -685,7 +684,7 @@ public class SqlStatementResourceTest extends MSQTestBase } @BeforeEach - public void init() throws Exception + public void init() { overlordClient = Mockito.mock(OverlordClient.class); setupMocks(overlordClient); @@ -693,7 +692,7 @@ public class SqlStatementResourceTest extends MSQTestBase sqlStatementFactory, objectMapper, overlordClient, - new LocalFileStorageConnector(newTempFolder("local")), + tempDir -> localFileStorageConnector, authorizerMapper ); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java index 4dadeae5bc1..22c7cff8847 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java @@ -40,6 +40,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; @@ -72,6 +73,7 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import javax.annotation.Nullable; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -92,6 +94,7 @@ public class MSQTestControllerContext implements ControllerContext NUM_WORKERS, "MultiStageQuery-test-controller-client" )); + private final File tempDir = FileUtils.createTempDir(); private final CoordinatorClient coordinatorClient; private final DruidNode node = new DruidNode( "controller", @@ -360,6 +363,12 @@ public class MSQTestControllerContext implements ControllerContext ); } + @Override + public File taskTempDir() + { + return tempDir; + } + @Override public void registerController(Controller controller, Closer closer) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java index d5477c2998e..fec5776aedc 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportConfig.java @@ -39,7 +39,7 @@ public class S3ExportConfig @JsonCreator public S3ExportConfig( - @JsonProperty("tempLocalDir") final String tempLocalDir, + @JsonProperty("tempLocalDir") @Nullable final String tempLocalDir, @JsonProperty("chunkSize") @Nullable final HumanReadableBytes chunkSize, @JsonProperty("maxRetry") @Nullable final Integer maxRetry, @JsonProperty("allowedExportPaths") final List allowedExportPaths) @@ -50,6 +50,7 @@ public class S3ExportConfig this.allowedExportPaths = allowedExportPaths; } + @Nullable public String getTempLocalDir() { return tempLocalDir; diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java index ca599fc9d49..129622dfbd7 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java @@ -69,14 +69,14 @@ public class S3ExportStorageProvider implements ExportStorageProvider this.prefix = prefix; } + @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { - final String tempDir = s3ExportConfig.getTempLocalDir(); - if (tempDir == null) { - throw DruidException.forPersona(DruidException.Persona.OPERATOR) - .ofCategory(DruidException.Category.NOT_FOUND) - .build("The runtime property `druid.export.storage.s3.tempLocalDir` must be configured for S3 export."); + final String exportConfigTempDir = s3ExportConfig.getTempLocalDir(); + final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir; + if (tempDirFile == null) { + throw DruidException.defensive("Couldn't find temporary directory for export."); } final List allowedExportPaths = s3ExportConfig.getAllowedExportPaths(); if (allowedExportPaths == null) { @@ -89,7 +89,7 @@ public class S3ExportStorageProvider implements ExportStorageProvider final S3OutputConfig s3OutputConfig = new S3OutputConfig( bucket, prefix, - new File(tempDir), + tempDirFile, s3ExportConfig.getChunkSize(), s3ExportConfig.getMaxRetry() ); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java index 35e228f7ef3..cacbe4272e6 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3OutputConfig.java @@ -40,6 +40,7 @@ public class S3OutputConfig @JsonProperty private String prefix; + @Nullable @JsonProperty private File tempDir; @@ -57,7 +58,7 @@ public class S3OutputConfig public S3OutputConfig( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty("chunkSize") HumanReadableBytes chunkSize, @JsonProperty("maxRetry") Integer maxRetry ) @@ -69,6 +70,7 @@ public class S3OutputConfig protected S3OutputConfig( String bucket, String prefix, + @Nullable File tempDir, @Nullable HumanReadableBytes chunkSize, @@ -120,6 +122,7 @@ public class S3OutputConfig return prefix; } + @Nullable public File getTempDir() { return tempDir; @@ -135,6 +138,11 @@ public class S3OutputConfig return maxRetry; } + public S3OutputConfig withTempDir(File tempDir) + { + return new S3OutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry); + } + private static void validateChunkSize(long chunkSize) { if (S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES < chunkSize) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java index f86aee9a1aa..57f131746f0 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java @@ -30,6 +30,7 @@ import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import javax.annotation.Nullable; import java.io.File; @JsonTypeName(S3StorageDruidModule.SCHEME) @@ -45,7 +46,7 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag public S3StorageConnectorProvider( @JsonProperty(value = "bucket", required = true) String bucket, @JsonProperty(value = "prefix", required = true) String prefix, - @JsonProperty(value = "tempDir", required = true) File tempDir, + @JsonProperty(value = "tempDir") @Nullable File tempDir, @JsonProperty("chunkSize") HumanReadableBytes chunkSize, @JsonProperty("maxRetry") Integer maxRetry ) @@ -54,8 +55,9 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File defaultTempDir) { - return new S3StorageConnector(this, s3, s3UploadManager); + S3OutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this; + return new S3StorageConnector(config, s3, s3UploadManager); } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java index a880d6f2efa..3210a26cc58 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java @@ -29,12 +29,11 @@ import com.google.inject.ProvisionException; import com.google.inject.name.Names; import org.apache.druid.common.aws.AWSModule; import org.apache.druid.guice.JsonConfigProvider; -import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.query.DruidProcessingConfigTest; -import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; import org.apache.druid.storage.s3.output.S3ExportConfig; @@ -54,6 +53,7 @@ public class S3StorageConnectorProviderTest { private static final String CUSTOM_NAMESPACE = "custom"; + private final File tempDir = FileUtils.createTempDir(); @Test public void createS3StorageFactoryWithRequiredProperties() @@ -67,7 +67,7 @@ public class S3StorageConnectorProviderTest StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties); Assert.assertTrue(s3StorageConnectorProvider instanceof S3StorageConnectorProvider); - Assert.assertTrue(s3StorageConnectorProvider.get() instanceof S3StorageConnector); + Assert.assertTrue(s3StorageConnectorProvider.createStorageConnector(tempDir) instanceof S3StorageConnector); Assert.assertEquals("bucket", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getBucket()); Assert.assertEquals("prefix", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getPrefix()); Assert.assertEquals(new File("/tmp"), ((S3StorageConnectorProvider) s3StorageConnectorProvider).getTempDir()); @@ -115,9 +115,9 @@ public class S3StorageConnectorProviderTest properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); Assert.assertThrows( - "Missing required creator property 'tempDir'", - ProvisionException.class, - () -> getStorageConnectorProvider(properties) + "tempDir is null in s3 config", + NullPointerException.class, + () -> getStorageConnectorProvider(properties).createStorageConnector(null) ); } @@ -138,10 +138,6 @@ public class S3StorageConnectorProviderTest StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE) ); - - binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE))) - .toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE))) - .in(LazySingleton.class); } } ).withProperties(properties); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java index 72ea888615f..3e247653713 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3OutputSerdeTest.java @@ -97,20 +97,6 @@ public class S3OutputSerdeTest MAPPER.readValue(json, S3OutputConfig.class); } - @Test - public void noTempDir() throws JsonProcessingException - { - String json = jsonStringReadyForAssert("{\n" - + " \"prefix\": \"abc\",\n" - + " \"bucket\": \"TEST\",\n" - + " \"chunkSize\":104857600,\n" - + " \"maxRetry\": 2\n" - + "}\n"); - expectedException.expect(MismatchedInputException.class); - expectedException.expectMessage("Missing required creator property 'tempDir'"); - MAPPER.readValue(json, S3OutputConfig.class); - } - @Test public void leastArguments() throws JsonProcessingException { diff --git a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java index 173544e2f8d..d19b2f5f7e1 100644 --- a/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/ExportStorageProvider.java @@ -20,10 +20,11 @@ package org.apache.druid.storage; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; + +import java.io.File; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -public interface ExportStorageProvider extends Provider +public interface ExportStorageProvider { String getResourceType(); @@ -33,4 +34,6 @@ public interface ExportStorageProvider extends Provider String getBasePath(); String getFilePathForManifest(String fileName); + + StorageConnector createStorageConnector(File taskTempDir); } diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java b/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java index 9fece71eab8..a2023e6aa94 100644 --- a/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/StorageConnectorProvider.java @@ -20,9 +20,20 @@ package org.apache.druid.storage; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.google.inject.Provider; + +import java.io.File; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -public interface StorageConnectorProvider extends Provider +public interface StorageConnectorProvider { + /** + * Returns the storage connector. Takes a parameter defaultTempDir to be possibly used as the temporary directory, if the + * storage connector requires one. This StorageConnectorProvider is not guaranteed to use this value, even if the + * StorageConnectorProvider requires one, as it gives priority to a value of defaultTempDir configured as a runtime + * configuration. + *
+ * This value needs to be passed instead of injected by Jackson as the default temporary directory is dependent on the + * task id, and such dynamic task specific bindings is not possible on indexers. + */ + StorageConnector createStorageConnector(File defaultTempDir); } diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java index 74b099aef88..8755b937329 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileExportStorageProvider.java @@ -54,7 +54,7 @@ public class LocalFileExportStorageProvider implements ExportStorageProvider } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File taskTempDir) { final File exportDestination = validateAndGetPath(storageConfig.getBaseDir(), exportPath); try { diff --git a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java index 82d1623f840..7c66f790627 100644 --- a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java +++ b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnectorProvider.java @@ -45,7 +45,7 @@ public class LocalFileStorageConnectorProvider implements StorageConnectorProvid } @Override - public StorageConnector get() + public StorageConnector createStorageConnector(File defaultTempDir) { try { return new LocalFileStorageConnector(basePath); diff --git a/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java b/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java index df9a88d4813..46cc01760eb 100644 --- a/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java +++ b/processing/src/test/java/org/apache/druid/storage/StorageConnectorModuleTest.java @@ -46,8 +46,9 @@ public class StorageConnectorModuleTest public void testJsonSerde() throws JsonProcessingException { StorageConnectorProvider storageConnectorProvider = objectMapper.readValue(JSON, StorageConnectorProvider.class); - Assert.assertTrue(storageConnectorProvider.get() instanceof LocalFileStorageConnector); - Assert.assertEquals(new File("/tmp"), ((LocalFileStorageConnector) storageConnectorProvider.get()).getBasePath()); + StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(new File("/tmp/tmpDir")); + Assert.assertTrue(storageConnector instanceof LocalFileStorageConnector); + Assert.assertEquals(new File("/tmp"), ((LocalFileStorageConnector) storageConnector).getBasePath()); } diff --git a/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java b/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java index 4bc8886f999..d6eec86723b 100644 --- a/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java +++ b/processing/src/test/java/org/apache/druid/storage/local/LocalFileStorageConnectorTest.java @@ -49,14 +49,14 @@ public class LocalFileStorageConnectorTest @Rule public ExpectedException expectedException = ExpectedException.none(); - private File tempDir; + private File storageDir; private StorageConnector storageConnector; @Before public void init() throws IOException { - tempDir = temporaryFolder.newFolder(); - storageConnector = new LocalFileStorageConnectorProvider(tempDir).get(); + storageDir = temporaryFolder.newFolder(); + storageConnector = new LocalFileStorageConnectorProvider(storageDir).createStorageConnector(null); } @Test @@ -69,14 +69,14 @@ public class LocalFileStorageConnectorTest // check if file is created Assert.assertTrue(storageConnector.pathExists(uuid)); - Assert.assertTrue(new File(tempDir.getAbsolutePath(), uuid).exists()); + Assert.assertTrue(new File(storageDir.getAbsolutePath(), uuid).exists()); // check contents checkContents(uuid); // delete file storageConnector.deleteFile(uuid); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid).exists()); } @Test @@ -96,14 +96,14 @@ public class LocalFileStorageConnectorTest checkContents(uuid1); checkContents(uuid2); - File baseFile = new File(tempDir.getAbsolutePath(), uuid_base); + File baseFile = new File(storageDir.getAbsolutePath(), uuid_base); Assert.assertTrue(baseFile.exists()); Assert.assertTrue(baseFile.isDirectory()); Assert.assertEquals(2, baseFile.listFiles().length); storageConnector.deleteRecursively(uuid_base); Assert.assertFalse(baseFile.exists()); - Assert.assertTrue(new File(tempDir.getAbsolutePath(), topLevelDir).exists()); + Assert.assertTrue(new File(storageDir.getAbsolutePath(), topLevelDir).exists()); } @Test @@ -118,8 +118,8 @@ public class LocalFileStorageConnectorTest // delete file storageConnector.deleteFiles(ImmutableList.of(uuid1, uuid2)); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid1).exists()); - Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid2).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid1).exists()); + Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid2).exists()); } @Test @@ -128,7 +128,7 @@ public class LocalFileStorageConnectorTest File file = temporaryFolder.newFile(); expectedException.expect(IAE.class); StorageConnectorProvider storageConnectorProvider = new LocalFileStorageConnectorProvider(file); - storageConnectorProvider.get(); + storageConnectorProvider.createStorageConnector(null); } @Test