Make tempStorageDirectory configuration optional and rely on task dir instead (#17015)

Currently, durable storage and export both require configuring a temporary directory to be used using druid.export.storage.<connectorType>.tempLocalDir and druid.msq.intermediate.storage.tempDir.

Tasks on middle manager already have a configured temporary directory. This PR aims to reduce the configuration required by using the task directory as a default if it is not explicitly configured, thus reducing the number of configs that a user has to set.

Please note that preference would be given to the user configured, druid.*.storage.temp*Dir, on the tasks. If that is not configured, we then use the configured temporary directory.

Overlord and brokers also require storage connector configurations (for the durableStorageCleanerOverlordDuty and to fetch results of async queries respectively), but do not have a default temporary task directory. The configuration is still required for these services.
This commit is contained in:
Adarsh Sanjeev 2024-10-29 13:36:59 +05:30 committed by GitHub
parent 6a9c050095
commit b7c661b801
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 213 additions and 167 deletions

View File

@ -155,8 +155,8 @@ The following runtime parameters must be configured to export into an S3 destina
| Runtime Parameter | Required | Description | Default |
|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
| `druid.export.storage.s3.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a |
| `druid.export.storage.s3.allowedExportPaths` | Yes | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a |
| `druid.export.storage.s3.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a |
| `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |
@ -186,12 +186,12 @@ Supported arguments for the function:
The following runtime parameters must be configured to export into a GCS destination:
| Runtime Parameter | Required | Description | Default |
|--------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `druid.export.storage.google.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a |
| Runtime Parameter | Required | Description | Default |
|--------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `druid.export.storage.google.allowedExportPaths` | Yes | An array of GS prefixes that are allowed as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"gs://bucket1/export/\", \"gs://bucket2/export/\"]` | n/a |
| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB |
| `druid.export.storage.google.tempLocalDir` | No | Directory used on the local storage of the worker to store temporary files required while uploading the data. Uses the task temporary directory by default. | n/a |
| `druid.export.storage.google.maxRetry` | No | Defines the max number times to attempt GS API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.google.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. A large chunk size reduces the API calls to GS; however, it requires more disk space to store the temporary chunks. | 4MiB |
##### LOCAL
@ -531,7 +531,7 @@ Common properties to configure the behavior of durable storage
|--|--|--|
|`druid.msq.intermediate.storage.enable` | Yes | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md). | false |
|`druid.msq.intermediate.storage.type` | Yes | The type of storage to use. Set it to `s3` for S3, `azure` for Azure and `google` for Google | n/a |
|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data | n/a |
|`druid.msq.intermediate.storage.tempDir`| Yes | Directory path on the local disk to store temporary files required while uploading and downloading the data. If the property is not configured on the indexer or middle manager, it defaults to using the task temporary directory. | n/a |
|`druid.msq.intermediate.storage.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
|`druid.msq.intermediate.storage.chunkSize` | No | Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.| 100MiB |

View File

@ -42,6 +42,7 @@ public class AzureOutputConfig
@JsonProperty
private final String prefix;
@Nullable
@JsonProperty
private final File tempDir;
@ -64,7 +65,7 @@ public class AzureOutputConfig
public AzureOutputConfig(
@JsonProperty(value = "container", required = true) String container,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty(value = "tempDir") @Nullable File tempDir,
@JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
@JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
)
@ -77,7 +78,6 @@ public class AzureOutputConfig
validateFields();
}
public String getContainer()
{
return container;
@ -88,6 +88,7 @@ public class AzureOutputConfig
return prefix;
}
@Nullable
public File getTempDir()
{
return tempDir;
@ -103,6 +104,11 @@ public class AzureOutputConfig
return maxRetry;
}
public AzureOutputConfig withTempDir(File tempDir)
{
return new AzureOutputConfig(container, prefix, tempDir, chunkSize, maxRetry);
}
private void validateFields()
{
if (chunkSize.getBytes() < AZURE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > AZURE_MAX_CHUNK_SIZE_BYTES) {
@ -113,6 +119,13 @@ public class AzureOutputConfig
AZURE_MAX_CHUNK_SIZE_BYTES
);
}
}
public void validateTempDirectory()
{
if (tempDir == null) {
throw DruidException.defensive("The runtime property `druid.msq.intermediate.storage.tempDir` must be configured.");
}
try {
FileUtils.mkdirp(tempDir);

View File

@ -45,7 +45,7 @@ public class AzureStorageConnectorProvider extends AzureOutputConfig implements
public AzureStorageConnectorProvider(
@JsonProperty(value = "container", required = true) String container,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty(value = "tempDir") @Nullable File tempDir,
@JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
@JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
)
@ -54,8 +54,10 @@ public class AzureStorageConnectorProvider extends AzureOutputConfig implements
}
@Override
public StorageConnector get()
public StorageConnector createStorageConnector(final File defaultTempDir)
{
return new AzureStorageConnector(this, azureStorage);
AzureOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this;
config.validateTempDirectory();
return new AzureStorageConnector(config, azureStorage);
}
}

View File

@ -57,10 +57,9 @@ public class AzureOutputConfigTest
throw new ISE("Unable to change the permission of temp folder for %s", this.getClass().getName());
}
//noinspection ResultOfObjectAllocationIgnored
assertThrows(
DruidException.class,
() -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT)
() -> new AzureOutputConfig(CONTAINER, PREFIX, tempDir, null, MAX_RETRY_COUNT).validateTempDirectory()
);
}

View File

@ -86,18 +86,6 @@ public class AzureOutputSerdeTest
assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class));
}
@Test
public void noTempDir()
{
String json = jsonStringReadyForAssert("{\n"
+ " \"prefix\": \"abc\",\n"
+ " \"container\": \"TEST\",\n"
+ " \"chunkSize\":104857600,\n"
+ " \"maxRetry\": 2\n"
+ "}\n");
assertThrows(MismatchedInputException.class, () -> MAPPER.readValue(json, AzureOutputConfig.class));
}
@Test
public void leastArguments() throws JsonProcessingException
{

View File

@ -25,10 +25,9 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.ProvisionException;
import com.google.inject.name.Names;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.azure.AzureStorage;
@ -59,7 +58,7 @@ public class AzureStorageConnectorProviderTest
StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties);
assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider);
assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get());
assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.createStorageConnector(new File("/tmp")));
assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer());
assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix());
assertEquals(new File("/tmp"),
@ -107,30 +106,36 @@ public class AzureStorageConnectorProviderTest
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
assertThrows(
ProvisionException.class,
() -> getStorageConnectorProvider(properties),
"Missing required creator property 'tempDir'"
DruidException.class,
() -> getStorageConnectorProvider(properties).createStorageConnector(null),
"The runtime property `druid.msq.intermediate.storage.tempDir` must be configured."
);
}
@Test
public void createAzureStorageFactoryWithMissingTempDirButProvidedDuringRuntime()
{
final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
getStorageConnectorProvider(properties).createStorageConnector(new File("/tmp"));
}
private StorageConnectorProvider getStorageConnectorProvider(Properties properties)
{
StartupInjectorBuilder startupInjectorBuilder = new StartupInjectorBuilder().add(
new AzureStorageDruidModule(),
new StorageConnectorModule(),
new AzureStorageConnectorModule(),
binder -> {
JsonConfigProvider.bind(
binder,
CUSTOM_NAMESPACE,
StorageConnectorProvider.class,
Names.named(CUSTOM_NAMESPACE)
);
binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE)))
.toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE)))
.in(LazySingleton.class);
}
binder -> JsonConfigProvider.bind(
binder,
CUSTOM_NAMESPACE,
StorageConnectorProvider.class,
Names.named(CUSTOM_NAMESPACE)
)
).withProperties(properties);
Injector injector = startupInjectorBuilder.build();

View File

@ -70,13 +70,12 @@ public class GoogleExportStorageProvider implements ExportStorageProvider
}
@Override
public StorageConnector get()
public StorageConnector createStorageConnector(File taskTempDir)
{
final String tempDir = googleExportConfig.getTempLocalDir();
if (tempDir == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("The runtime property `druid.export.storage.google.tempLocalDir` must be configured for GCS export.");
final String exportConfigTempDir = googleExportConfig.getTempLocalDir();
final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir;
if (tempDirFile == null) {
throw DruidException.defensive("Couldn't find temporary directory for export.");
}
final List<String> allowedExportPaths = googleExportConfig.getAllowedExportPaths();
if (allowedExportPaths == null) {
@ -89,7 +88,7 @@ public class GoogleExportStorageProvider implements ExportStorageProvider
final GoogleOutputConfig googleOutputConfig = new GoogleOutputConfig(
bucket,
prefix,
new File(tempDir),
tempDirFile,
googleExportConfig.getChunkSize(),
googleExportConfig.getMaxRetry()
);

View File

@ -37,6 +37,7 @@ public class GoogleOutputConfig
@JsonProperty
private final String prefix;
@Nullable
@JsonProperty
private final File tempDir;
@ -58,7 +59,7 @@ public class GoogleOutputConfig
public GoogleOutputConfig(
final String bucket,
final String prefix,
final File tempDir,
@Nullable final File tempDir,
@Nullable final HumanReadableBytes chunkSize,
@Nullable final Integer maxRetry
)
@ -82,6 +83,7 @@ public class GoogleOutputConfig
return prefix;
}
@Nullable
public File getTempDir()
{
return tempDir;
@ -97,6 +99,11 @@ public class GoogleOutputConfig
return maxRetry;
}
public GoogleOutputConfig withTempDir(File tempDir)
{
return new GoogleOutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry);
}
private void validateFields()
{
if (chunkSize.getBytes() < GOOGLE_MIN_CHUNK_SIZE_BYTES || chunkSize.getBytes() > GOOGLE_MAX_CHUNK_SIZE_BYTES) {

View File

@ -47,7 +47,7 @@ public class GoogleStorageConnectorProvider extends GoogleOutputConfig implement
public GoogleStorageConnectorProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty(value = "tempDir") @Nullable File tempDir,
@JsonProperty(value = "chunkSize") @Nullable HumanReadableBytes chunkSize,
@JsonProperty(value = "maxRetry") @Nullable Integer maxRetry
)
@ -56,8 +56,9 @@ public class GoogleStorageConnectorProvider extends GoogleOutputConfig implement
}
@Override
public StorageConnector get()
public StorageConnector createStorageConnector(File defaultTempDir)
{
return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig);
GoogleOutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this;
return new GoogleStorageConnector(config, googleStorage, googleInputDataConfig);
}
}

View File

@ -21,14 +21,18 @@ package org.apache.druid.storage.google.output;
import com.google.common.collect.ImmutableList;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.storage.StorageConnector;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.List;
public class GoogleExportStorageProviderTest
{
private final File tempDir = FileUtils.createTempDir();
private final List<String> validPrefixes = ImmutableList.of(
"gs://bucket-name/validPath1",
"gs://bucket-name/validPath2"
@ -39,7 +43,7 @@ public class GoogleExportStorageProviderTest
{
GoogleExportStorageProvider googleExportStorageProvider = new GoogleExportStorageProvider("bucket-name", "validPath1");
googleExportStorageProvider.googleExportConfig = new GoogleExportConfig("tempLocalDir", null, null, validPrefixes);
StorageConnector storageConnector = googleExportStorageProvider.get();
StorageConnector storageConnector = googleExportStorageProvider.createStorageConnector(tempDir);
Assert.assertNotNull(storageConnector);
Assert.assertTrue(storageConnector instanceof GoogleStorageConnector);

View File

@ -26,9 +26,8 @@ import com.google.inject.Key;
import com.google.inject.ProvisionException;
import com.google.inject.name.Names;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.storage.StorageConnectorModule;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.google.GoogleInputDataConfig;
@ -44,6 +43,7 @@ import java.util.Properties;
public class GoogleStorageConnectorProviderTest
{
private static final String CUSTOM_NAMESPACE = "custom";
private final File tempDir = FileUtils.createTempDir();
@Test
public void createGoogleStorageFactoryWithRequiredProperties()
@ -57,7 +57,7 @@ public class GoogleStorageConnectorProviderTest
StorageConnectorProvider googleStorageConnectorProvider = getStorageConnectorProvider(properties);
Assert.assertTrue(googleStorageConnectorProvider instanceof GoogleStorageConnectorProvider);
Assert.assertTrue(googleStorageConnectorProvider.get() instanceof GoogleStorageConnector);
Assert.assertTrue(googleStorageConnectorProvider.createStorageConnector(tempDir) instanceof GoogleStorageConnector);
Assert.assertEquals("bucket", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getBucket());
Assert.assertEquals("prefix", ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getPrefix());
Assert.assertEquals(new File("/tmp"), ((GoogleStorageConnectorProvider) googleStorageConnectorProvider).getTempDir());
@ -124,10 +124,6 @@ public class GoogleStorageConnectorProviderTest
StorageConnectorProvider.class,
Names.named(CUSTOM_NAMESPACE)
);
binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE)))
.toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE)))
.in(LazySingleton.class);
}
).withProperties(properties);

View File

@ -35,6 +35,8 @@ import org.apache.druid.msq.querykit.QueryKitSpec;
import org.apache.druid.query.Query;
import org.apache.druid.server.DruidNode;
import java.io.File;
/**
* Context used by multi-stage query controllers. Useful because it allows test fixtures to provide their own
* implementations.
@ -104,6 +106,14 @@ public interface ControllerContext
WorkerFailureListener workerFailureListener
);
/**
* Fetch a directory for temporary outputs
*/
default File taskTempDir()
{
throw new UnsupportedOperationException();
}
/**
* Client for communicating with workers.
*/

View File

@ -573,7 +573,7 @@ public class ControllerImpl implements Controller
final QueryDefinition queryDef = makeQueryDefinition(
context.makeQueryKitSpec(makeQueryControllerToolKit(), queryId, querySpec, queryKernelConfig),
querySpec,
context.jsonMapper(),
context,
resultsContext
);
@ -1566,26 +1566,7 @@ public class ControllerImpl implements Controller
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider());
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
//noinspection unchecked
Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId);
if (!(resultObjectForStage instanceof List)) {
// This might occur if all workers are running on an older version. We are not able to write a manifest file in this case.
log.warn("Was unable to create manifest file due to ");
return;
}
@SuppressWarnings("unchecked")
List<String> exportedFiles = (List<String>) queryKernel.getResultObjectForStage(finalStageId);
log.info("Query [%s] exported %d files.", queryDef.getQueryId(), exportedFiles.size());
exportMetadataManager.writeMetadata(exportedFiles);
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider());
ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider(), context.taskTempDir());
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
//noinspection unchecked
@ -1734,10 +1715,11 @@ public class ControllerImpl implements Controller
private static QueryDefinition makeQueryDefinition(
final QueryKitSpec queryKitSpec,
final MSQSpec querySpec,
final ObjectMapper jsonMapper,
final ControllerContext controllerContext,
final ResultsContext resultsContext
)
{
final ObjectMapper jsonMapper = controllerContext.jsonMapper();
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
final ColumnMappings columnMappings = querySpec.getColumnMappings();
final Query<?> queryToPlan;
@ -1855,7 +1837,7 @@ public class ControllerImpl implements Controller
try {
// Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export.
Iterator<String> filesIterator = exportStorageProvider.get().listDir("");
Iterator<String> filesIterator = exportStorageProvider.createStorageConnector(controllerContext.taskTempDir()).listDir("");
if (filesIterator.hasNext()) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)

View File

@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.storage.StorageConnector;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
@ -42,15 +43,17 @@ public class ExportMetadataManager
public static final int MANIFEST_FILE_VERSION = 1;
private static final Logger log = new Logger(ExportMetadataManager.class);
private final ExportStorageProvider exportStorageProvider;
private final File tmpDir;
public ExportMetadataManager(final ExportStorageProvider exportStorageProvider)
public ExportMetadataManager(final ExportStorageProvider exportStorageProvider, final File tmpDir)
{
this.exportStorageProvider = exportStorageProvider;
this.tmpDir = tmpDir;
}
public void writeMetadata(List<String> exportedFiles) throws IOException
{
final StorageConnector storageConnector = exportStorageProvider.get();
final StorageConnector storageConnector = exportStorageProvider.createStorageConnector(tmpDir);
log.info("Writing manifest file at location [%s]", exportStorageProvider.getBasePath());
if (storageConnector.pathExists(MANIFEST_FILE) || storageConnector.pathExists(META_FILE)) {

View File

@ -27,14 +27,12 @@ import com.google.inject.Key;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.overlord.duty.OverlordDuty;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner;
import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig;
import org.apache.druid.storage.NilStorageConnector;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import java.util.List;
@ -84,10 +82,6 @@ public class MSQDurableStorageModule implements DruidModule
MultiStageQuery.class
);
binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class))
.toProvider(Key.get(StorageConnectorProvider.class, MultiStageQuery.class))
.in(LazySingleton.class);
if (nodeRoles.contains(NodeRole.OVERLORD)) {
JsonConfigProvider.bind(
binder,
@ -99,11 +93,9 @@ public class MSQDurableStorageModule implements DruidModule
.addBinding()
.to(DurableStorageCleaner.class);
}
} else if (nodeRoles.contains(NodeRole.BROKER)) {
// bind with nil implementation so that configs are not required during service startups of broker since SQLStatementResource uses it.
binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class)).toInstance(NilStorageConnector.getInstance());
} else {
// do nothing
// bind with nil implementation so that configs are not required during service startups.
binder.bind(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)).toInstance(tempDir -> NilStorageConnector.getInstance());
}
}

View File

@ -40,6 +40,7 @@ import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.client.IndexerWorkerClient;
import org.apache.druid.msq.indexing.error.MSQException;
@ -59,7 +60,10 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -91,12 +95,16 @@ public class IndexerControllerContext implements ControllerContext
{
this.task = task;
this.toolbox = toolbox;
this.injector = injector;
this.clientFactory = clientFactory;
this.overlordClient = overlordClient;
this.metricBuilder = new ServiceMetricEvent.Builder();
this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class);
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class));
final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir());
this.injector = injector.createChildInjector(
binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class))
.toInstance(storageConnector));
}
@Override
@ -212,6 +220,12 @@ public class IndexerControllerContext implements ControllerContext
);
}
@Override
public File taskTempDir()
{
return toolbox.getIndexingTmpDir();
}
@Override
public QueryKitSpec makeQueryKitSpec(
final QueryKit<Query<?>> queryKit,

View File

@ -43,6 +43,7 @@ import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerContext;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.client.IndexerControllerClient;
import org.apache.druid.msq.indexing.client.IndexerWorkerClient;
import org.apache.druid.msq.indexing.client.WorkerChatHandler;
@ -61,6 +62,8 @@ import org.apache.druid.rpc.indexing.SpecificTaskServiceLocator;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import java.io.File;
import java.util.concurrent.ExecutorService;
@ -106,7 +109,6 @@ public class IndexerWorkerContext implements WorkerContext
{
this.task = task;
this.toolbox = toolbox;
this.injector = injector;
this.overlordClient = overlordClient;
this.indexIO = indexIO;
this.dataSegmentProvider = dataSegmentProvider;
@ -121,6 +123,11 @@ public class IndexerWorkerContext implements WorkerContext
IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES
);
this.includeAllCounters = MultiStageQueryContext.getIncludeAllCounters(queryContext);
final StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class));
final StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir());
this.injector = injector.createChildInjector(
binder -> binder.bind(Key.get(StorageConnector.class, MultiStageQuery.class))
.toInstance(storageConnector));
}
public static IndexerWorkerContext createProductionInstance(

View File

@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.duty.OverlordDuty;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import java.util.HashSet;
import java.util.Iterator;
@ -55,12 +56,12 @@ public class DurableStorageCleaner implements OverlordDuty
@Inject
public DurableStorageCleaner(
final DurableStorageCleanerConfig config,
final @MultiStageQuery StorageConnector storageConnector,
final @MultiStageQuery StorageConnectorProvider storageConnectorProvider,
@JacksonInject final Provider<TaskMaster> taskMasterProvider
)
{
this.config = config;
this.storageConnector = storageConnector;
this.storageConnector = storageConnectorProvider.createStorageConnector(null);
this.taskMasterProvider = taskMasterProvider;
}

View File

@ -160,7 +160,7 @@ public class ExportResultsFrameProcessorFactory implements FrameProcessorFactory
readableInput.getChannel(),
exportFormat,
readableInput.getChannelFrameReader(),
exportStorageProvider.get(),
exportStorageProvider.createStorageConnector(frameContext.tempDir()),
frameContext.jsonMapper(),
channelCounter,
getExportFilePath(queryId, workerNumber, readableInput.getStagePartition().getPartitionNumber(), exportFormat),

View File

@ -91,6 +91,7 @@ import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.sql.http.SqlResource;
import org.apache.druid.storage.NilStorageConnector;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.servlet.http.HttpServletRequest;
@ -134,14 +135,14 @@ public class SqlStatementResource
final @MultiStageQuery SqlStatementFactory msqSqlStatementFactory,
final ObjectMapper jsonMapper,
final OverlordClient overlordClient,
final @MultiStageQuery StorageConnector storageConnector,
final @MultiStageQuery StorageConnectorProvider storageConnectorProvider,
final AuthorizerMapper authorizerMapper
)
{
this.msqSqlStatementFactory = msqSqlStatementFactory;
this.jsonMapper = jsonMapper;
this.overlordClient = overlordClient;
this.storageConnector = storageConnector;
this.storageConnector = storageConnectorProvider.createStorageConnector(null);
this.authorizerMapper = authorizerMapper;
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.duty.DutySchedule;
import org.apache.druid.msq.indexing.cleaner.DurableStorageCleaner;
import org.apache.druid.msq.indexing.cleaner.DurableStorageCleanerConfig;
import org.apache.druid.storage.NilStorageConnector;
import org.apache.druid.storage.StorageConnector;
import org.easymock.Capture;
import org.easymock.EasyMock;
@ -59,7 +60,7 @@ public class DurableStorageCleanerTest
durableStorageCleanerConfig.enabled = true;
durableStorageCleaner = new DurableStorageCleaner(
durableStorageCleanerConfig,
STORAGE_CONNECTOR,
s -> STORAGE_CONNECTOR,
() -> TASK_MASTER
);
}
@ -126,7 +127,7 @@ public class DurableStorageCleanerTest
DurableStorageCleanerConfig cleanerConfig = new DurableStorageCleanerConfig();
cleanerConfig.delaySeconds = 10L;
cleanerConfig.enabled = true;
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, null, null);
DurableStorageCleaner durableStorageCleaner = new DurableStorageCleaner(cleanerConfig, (temp) -> NilStorageConnector.getInstance(), null);
DutySchedule schedule = durableStorageCleaner.getSchedule();
Assert.assertEquals(cleanerConfig.delaySeconds * 1000, schedule.getPeriodMillis());

View File

@ -22,12 +22,16 @@ package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.ServiceLocations;
import org.apache.druid.rpc.ServiceLocator;
import org.apache.druid.storage.NilStorageConnector;
import org.apache.druid.storage.StorageConnectorProvider;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -45,6 +49,8 @@ public class IndexerWorkerContextTest
final Injector injectorMock = Mockito.mock(Injector.class);
Mockito.when(injectorMock.getInstance(SegmentCacheManagerFactory.class))
.thenReturn(Mockito.mock(SegmentCacheManagerFactory.class));
Mockito.when(injectorMock.getInstance(Key.get(StorageConnectorProvider.class, MultiStageQuery.class)))
.thenReturn(defaultTempDir -> NilStorageConnector.getInstance());
final MSQWorkerTask task =
Mockito.mock(MSQWorkerTask.class, Mockito.withSettings().strictness(Strictness.STRICT_STUBS));

View File

@ -54,7 +54,6 @@ import org.junit.jupiter.api.Test;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
@ -76,7 +75,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
sqlStatementFactory,
objectMapper,
indexingServiceClient,
localFileStorageConnector,
s -> localFileStorageConnector,
authorizerMapper
);
}
@ -330,7 +329,7 @@ public class SqlMSQStatementResourcePostTest extends MSQTestBase
sqlStatementFactory,
objectMapper,
indexingServiceClient,
NilStorageConnector.getInstance(),
s -> NilStorageConnector.getInstance(),
authorizerMapper
);

View File

@ -81,7 +81,6 @@ import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.sql.http.SqlResourceTest;
import org.apache.druid.storage.local.LocalFileStorageConnector;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
@ -685,7 +684,7 @@ public class SqlStatementResourceTest extends MSQTestBase
}
@BeforeEach
public void init() throws Exception
public void init()
{
overlordClient = Mockito.mock(OverlordClient.class);
setupMocks(overlordClient);
@ -693,7 +692,7 @@ public class SqlStatementResourceTest extends MSQTestBase
sqlStatementFactory,
objectMapper,
overlordClient,
new LocalFileStorageConnector(newTempFolder("local")),
tempDir -> localFileStorageConnector,
authorizerMapper
);
}

View File

@ -40,6 +40,7 @@ import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
@ -72,6 +73,7 @@ import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -92,6 +94,7 @@ public class MSQTestControllerContext implements ControllerContext
NUM_WORKERS,
"MultiStageQuery-test-controller-client"
));
private final File tempDir = FileUtils.createTempDir();
private final CoordinatorClient coordinatorClient;
private final DruidNode node = new DruidNode(
"controller",
@ -360,6 +363,12 @@ public class MSQTestControllerContext implements ControllerContext
);
}
@Override
public File taskTempDir()
{
return tempDir;
}
@Override
public void registerController(Controller controller, Closer closer)
{

View File

@ -39,7 +39,7 @@ public class S3ExportConfig
@JsonCreator
public S3ExportConfig(
@JsonProperty("tempLocalDir") final String tempLocalDir,
@JsonProperty("tempLocalDir") @Nullable final String tempLocalDir,
@JsonProperty("chunkSize") @Nullable final HumanReadableBytes chunkSize,
@JsonProperty("maxRetry") @Nullable final Integer maxRetry,
@JsonProperty("allowedExportPaths") final List<String> allowedExportPaths)
@ -50,6 +50,7 @@ public class S3ExportConfig
this.allowedExportPaths = allowedExportPaths;
}
@Nullable
public String getTempLocalDir()
{
return tempLocalDir;

View File

@ -69,14 +69,14 @@ public class S3ExportStorageProvider implements ExportStorageProvider
this.prefix = prefix;
}
@Override
public StorageConnector get()
public StorageConnector createStorageConnector(File taskTempDir)
{
final String tempDir = s3ExportConfig.getTempLocalDir();
if (tempDir == null) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.NOT_FOUND)
.build("The runtime property `druid.export.storage.s3.tempLocalDir` must be configured for S3 export.");
final String exportConfigTempDir = s3ExportConfig.getTempLocalDir();
final File tempDirFile = exportConfigTempDir != null ? new File(exportConfigTempDir) : taskTempDir;
if (tempDirFile == null) {
throw DruidException.defensive("Couldn't find temporary directory for export.");
}
final List<String> allowedExportPaths = s3ExportConfig.getAllowedExportPaths();
if (allowedExportPaths == null) {
@ -89,7 +89,7 @@ public class S3ExportStorageProvider implements ExportStorageProvider
final S3OutputConfig s3OutputConfig = new S3OutputConfig(
bucket,
prefix,
new File(tempDir),
tempDirFile,
s3ExportConfig.getChunkSize(),
s3ExportConfig.getMaxRetry()
);

View File

@ -40,6 +40,7 @@ public class S3OutputConfig
@JsonProperty
private String prefix;
@Nullable
@JsonProperty
private File tempDir;
@ -57,7 +58,7 @@ public class S3OutputConfig
public S3OutputConfig(
@JsonProperty(value = "bucket", required = true) String bucket,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty(value = "tempDir") @Nullable File tempDir,
@JsonProperty("chunkSize") HumanReadableBytes chunkSize,
@JsonProperty("maxRetry") Integer maxRetry
)
@ -69,6 +70,7 @@ public class S3OutputConfig
protected S3OutputConfig(
String bucket,
String prefix,
@Nullable
File tempDir,
@Nullable
HumanReadableBytes chunkSize,
@ -120,6 +122,7 @@ public class S3OutputConfig
return prefix;
}
@Nullable
public File getTempDir()
{
return tempDir;
@ -135,6 +138,11 @@ public class S3OutputConfig
return maxRetry;
}
public S3OutputConfig withTempDir(File tempDir)
{
return new S3OutputConfig(bucket, prefix, tempDir, chunkSize, maxRetry);
}
private static void validateChunkSize(long chunkSize)
{
if (S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES < chunkSize) {

View File

@ -30,6 +30,7 @@ import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import javax.annotation.Nullable;
import java.io.File;
@JsonTypeName(S3StorageDruidModule.SCHEME)
@ -45,7 +46,7 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag
public S3StorageConnectorProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
@JsonProperty(value = "prefix", required = true) String prefix,
@JsonProperty(value = "tempDir", required = true) File tempDir,
@JsonProperty(value = "tempDir") @Nullable File tempDir,
@JsonProperty("chunkSize") HumanReadableBytes chunkSize,
@JsonProperty("maxRetry") Integer maxRetry
)
@ -54,8 +55,9 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag
}
@Override
public StorageConnector get()
public StorageConnector createStorageConnector(File defaultTempDir)
{
return new S3StorageConnector(this, s3, s3UploadManager);
S3OutputConfig config = this.getTempDir() == null ? this.withTempDir(defaultTempDir) : this;
return new S3StorageConnector(config, s3, s3UploadManager);
}
}

View File

@ -29,12 +29,11 @@ import com.google.inject.ProvisionException;
import com.google.inject.name.Names;
import org.apache.druid.common.aws.AWSModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.s3.output.S3ExportConfig;
@ -54,6 +53,7 @@ public class S3StorageConnectorProviderTest
{
private static final String CUSTOM_NAMESPACE = "custom";
private final File tempDir = FileUtils.createTempDir();
@Test
public void createS3StorageFactoryWithRequiredProperties()
@ -67,7 +67,7 @@ public class S3StorageConnectorProviderTest
StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties);
Assert.assertTrue(s3StorageConnectorProvider instanceof S3StorageConnectorProvider);
Assert.assertTrue(s3StorageConnectorProvider.get() instanceof S3StorageConnector);
Assert.assertTrue(s3StorageConnectorProvider.createStorageConnector(tempDir) instanceof S3StorageConnector);
Assert.assertEquals("bucket", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getBucket());
Assert.assertEquals("prefix", ((S3StorageConnectorProvider) s3StorageConnectorProvider).getPrefix());
Assert.assertEquals(new File("/tmp"), ((S3StorageConnectorProvider) s3StorageConnectorProvider).getTempDir());
@ -115,9 +115,9 @@ public class S3StorageConnectorProviderTest
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
Assert.assertThrows(
"Missing required creator property 'tempDir'",
ProvisionException.class,
() -> getStorageConnectorProvider(properties)
"tempDir is null in s3 config",
NullPointerException.class,
() -> getStorageConnectorProvider(properties).createStorageConnector(null)
);
}
@ -138,10 +138,6 @@ public class S3StorageConnectorProviderTest
StorageConnectorProvider.class,
Names.named(CUSTOM_NAMESPACE)
);
binder.bind(Key.get(StorageConnector.class, Names.named(CUSTOM_NAMESPACE)))
.toProvider(Key.get(StorageConnectorProvider.class, Names.named(CUSTOM_NAMESPACE)))
.in(LazySingleton.class);
}
}
).withProperties(properties);

View File

@ -97,20 +97,6 @@ public class S3OutputSerdeTest
MAPPER.readValue(json, S3OutputConfig.class);
}
@Test
public void noTempDir() throws JsonProcessingException
{
String json = jsonStringReadyForAssert("{\n"
+ " \"prefix\": \"abc\",\n"
+ " \"bucket\": \"TEST\",\n"
+ " \"chunkSize\":104857600,\n"
+ " \"maxRetry\": 2\n"
+ "}\n");
expectedException.expect(MismatchedInputException.class);
expectedException.expectMessage("Missing required creator property 'tempDir'");
MAPPER.readValue(json, S3OutputConfig.class);
}
@Test
public void leastArguments() throws JsonProcessingException
{

View File

@ -20,10 +20,11 @@
package org.apache.druid.storage;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
import java.io.File;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface ExportStorageProvider extends Provider<StorageConnector>
public interface ExportStorageProvider
{
String getResourceType();
@ -33,4 +34,6 @@ public interface ExportStorageProvider extends Provider<StorageConnector>
String getBasePath();
String getFilePathForManifest(String fileName);
StorageConnector createStorageConnector(File taskTempDir);
}

View File

@ -20,9 +20,20 @@
package org.apache.druid.storage;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.inject.Provider;
import java.io.File;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface StorageConnectorProvider extends Provider<StorageConnector>
public interface StorageConnectorProvider
{
/**
* Returns the storage connector. Takes a parameter defaultTempDir to be possibly used as the temporary directory, if the
* storage connector requires one. This StorageConnectorProvider is not guaranteed to use this value, even if the
* StorageConnectorProvider requires one, as it gives priority to a value of defaultTempDir configured as a runtime
* configuration.
* <br>
* This value needs to be passed instead of injected by Jackson as the default temporary directory is dependent on the
* task id, and such dynamic task specific bindings is not possible on indexers.
*/
StorageConnector createStorageConnector(File defaultTempDir);
}

View File

@ -54,7 +54,7 @@ public class LocalFileExportStorageProvider implements ExportStorageProvider
}
@Override
public StorageConnector get()
public StorageConnector createStorageConnector(File taskTempDir)
{
final File exportDestination = validateAndGetPath(storageConfig.getBaseDir(), exportPath);
try {

View File

@ -45,7 +45,7 @@ public class LocalFileStorageConnectorProvider implements StorageConnectorProvid
}
@Override
public StorageConnector get()
public StorageConnector createStorageConnector(File defaultTempDir)
{
try {
return new LocalFileStorageConnector(basePath);

View File

@ -46,8 +46,9 @@ public class StorageConnectorModuleTest
public void testJsonSerde() throws JsonProcessingException
{
StorageConnectorProvider storageConnectorProvider = objectMapper.readValue(JSON, StorageConnectorProvider.class);
Assert.assertTrue(storageConnectorProvider.get() instanceof LocalFileStorageConnector);
Assert.assertEquals(new File("/tmp"), ((LocalFileStorageConnector) storageConnectorProvider.get()).getBasePath());
StorageConnector storageConnector = storageConnectorProvider.createStorageConnector(new File("/tmp/tmpDir"));
Assert.assertTrue(storageConnector instanceof LocalFileStorageConnector);
Assert.assertEquals(new File("/tmp"), ((LocalFileStorageConnector) storageConnector).getBasePath());
}

View File

@ -49,14 +49,14 @@ public class LocalFileStorageConnectorTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
private File tempDir;
private File storageDir;
private StorageConnector storageConnector;
@Before
public void init() throws IOException
{
tempDir = temporaryFolder.newFolder();
storageConnector = new LocalFileStorageConnectorProvider(tempDir).get();
storageDir = temporaryFolder.newFolder();
storageConnector = new LocalFileStorageConnectorProvider(storageDir).createStorageConnector(null);
}
@Test
@ -69,14 +69,14 @@ public class LocalFileStorageConnectorTest
// check if file is created
Assert.assertTrue(storageConnector.pathExists(uuid));
Assert.assertTrue(new File(tempDir.getAbsolutePath(), uuid).exists());
Assert.assertTrue(new File(storageDir.getAbsolutePath(), uuid).exists());
// check contents
checkContents(uuid);
// delete file
storageConnector.deleteFile(uuid);
Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid).exists());
Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid).exists());
}
@Test
@ -96,14 +96,14 @@ public class LocalFileStorageConnectorTest
checkContents(uuid1);
checkContents(uuid2);
File baseFile = new File(tempDir.getAbsolutePath(), uuid_base);
File baseFile = new File(storageDir.getAbsolutePath(), uuid_base);
Assert.assertTrue(baseFile.exists());
Assert.assertTrue(baseFile.isDirectory());
Assert.assertEquals(2, baseFile.listFiles().length);
storageConnector.deleteRecursively(uuid_base);
Assert.assertFalse(baseFile.exists());
Assert.assertTrue(new File(tempDir.getAbsolutePath(), topLevelDir).exists());
Assert.assertTrue(new File(storageDir.getAbsolutePath(), topLevelDir).exists());
}
@Test
@ -118,8 +118,8 @@ public class LocalFileStorageConnectorTest
// delete file
storageConnector.deleteFiles(ImmutableList.of(uuid1, uuid2));
Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid1).exists());
Assert.assertFalse(new File(tempDir.getAbsolutePath(), uuid2).exists());
Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid1).exists());
Assert.assertFalse(new File(storageDir.getAbsolutePath(), uuid2).exists());
}
@Test
@ -128,7 +128,7 @@ public class LocalFileStorageConnectorTest
File file = temporaryFolder.newFile();
expectedException.expect(IAE.class);
StorageConnectorProvider storageConnectorProvider = new LocalFileStorageConnectorProvider(file);
storageConnectorProvider.get();
storageConnectorProvider.createStorageConnector(null);
}
@Test