Optimize S3 storage writing for MSQ durable storage (#16481)

* Optimise S3 storage writing for MSQ durable storage

* Get rid of static ConcurrentHashMap

* Fix static checks

* Fix tests

* Remove unused constructor parameter chunkValidation + relevant cleanup

* Assert etags as String instead of Integer

* Fix flaky test

* Inject executor service

* Make threadpool size dynamic based on number of cores

* Fix S3StorageDruidModuleTest

* Fix S3StorageConnectorProviderTest

* Fix injection issues

* Add S3UploadConfig to manage maximum number of concurrent chunks dynamically based on chunk size

* Address the minor review comments

* Refactor S3UploadConfig + ExecutorService into S3UploadManager

* Address review comments

* Make updateChunkSizeIfGreater() synchronized instead of recomputeMaxConcurrentNumChunks()

* Address the minor review comments

* Fix intellij-inspections check

* Refactor code to use futures for maxNumConcurrentChunks. Also use executor service with blocking queue for backpressure semantics.

* Update javadoc

* Get rid of cyclic dependency injection between S3UploadManager and S3OutputConfig

* Fix RetryableS3OutputStreamTest

* Remove unnecessary synchronization parts from RetryableS3OutputStream

* Update javadoc

* Add S3UploadManagerTest

* Revert back to S3StorageConnectorProvider extends S3OutputConfig

* Address Karan's review comments

* Address Kashif's review comments

* Change a log message to debug

* Address review comments

* Fix intellij-inspections check

* Fix checkstyle

---------

Co-authored-by: asdf2014 <asdf2014@apache.org>
This commit is contained in:
Akshat Jain 2024-06-07 11:33:16 +05:30 committed by GitHub
parent e9f723344b
commit 03a38be446
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 434 additions and 157 deletions

View File

@ -56,15 +56,14 @@ public class AzureStorageConnectorProviderTest
properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties);
StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties);
assertInstanceOf(AzureStorageConnectorProvider.class, s3StorageConnectorProvider);
assertInstanceOf(AzureStorageConnector.class, s3StorageConnectorProvider.get());
assertEquals("container", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getContainer());
assertEquals("prefix", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getPrefix());
assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider);
assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get());
assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer());
assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix());
assertEquals(new File("/tmp"),
((AzureStorageConnectorProvider) s3StorageConnectorProvider).getTempDir());
((AzureStorageConnectorProvider) storageConnectorProvider).getTempDir());
}
@Test
@ -72,7 +71,7 @@ public class AzureStorageConnectorProviderTest
{
final Properties properties = new Properties();
properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3");
properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure");
properties.setProperty(CUSTOM_NAMESPACE + ".container", "container");
properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp");
assertThrows(

View File

@ -19,16 +19,12 @@
package org.apache.druid.storage.s3.output;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.io.CountingOutputStream;
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
@ -49,6 +45,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@ -81,7 +78,6 @@ public class RetryableS3OutputStream extends OutputStream
private final File chunkStorePath;
private final long chunkSize;
private final List<PartETag> pushResults = new ArrayList<>();
private final byte[] singularBuffer = new byte[1];
// metric
@ -89,12 +85,6 @@ public class RetryableS3OutputStream extends OutputStream
private Chunk currentChunk;
private int nextChunkId = 1; // multipart upload requires partNumber to be in the range between 1 and 10000
private int numChunksPushed;
/**
* Total size of all chunks. This size is updated whenever the chunk is ready for push,
* not when {@link #write(byte[], int, int)} is called.
*/
private long resultsSize;
/**
* A flag indicating whether there was an upload error.
@ -103,27 +93,28 @@ public class RetryableS3OutputStream extends OutputStream
private boolean error;
private boolean closed;
/**
* Helper class for calculating maximum number of simultaneous chunks allowed on local disk.
*/
private final S3UploadManager uploadManager;
/**
* A list of futures to allow us to wait for completion of all uploadPart() calls
* before hitting {@link ServerSideEncryptingAmazonS3#completeMultipartUpload(CompleteMultipartUploadRequest)}.
*/
private final List<Future<UploadPartResult>> futures = new ArrayList<>();
public RetryableS3OutputStream(
S3OutputConfig config,
ServerSideEncryptingAmazonS3 s3,
String s3Key
) throws IOException
{
this(config, s3, s3Key, true);
}
@VisibleForTesting
protected RetryableS3OutputStream(
S3OutputConfig config,
ServerSideEncryptingAmazonS3 s3,
String s3Key,
boolean chunkValidation
S3UploadManager uploadManager
) throws IOException
{
this.config = config;
this.s3 = s3;
this.s3Key = s3Key;
this.uploadManager = uploadManager;
final InitiateMultipartUploadResult result;
try {
@ -138,9 +129,7 @@ public class RetryableS3OutputStream extends OutputStream
this.chunkStorePath = new File(config.getTempDir(), uploadId + UUID.randomUUID());
FileUtils.mkdirp(this.chunkStorePath);
this.chunkSize = config.getChunkSize();
this.pushStopwatch = Stopwatch.createUnstarted();
this.pushStopwatch.reset();
this.pushStopwatch = Stopwatch.createStarted();
this.currentChunk = new Chunk(nextChunkId, new File(chunkStorePath, String.valueOf(nextChunkId++)));
}
@ -172,7 +161,6 @@ public class RetryableS3OutputStream extends OutputStream
while (remainingBytesToWrite > 0) {
final int writtenBytes = writeToCurrentChunk(b, offsetToWrite, remainingBytesToWrite);
if (currentChunk.length() >= chunkSize) {
pushCurrentChunk();
currentChunk = new Chunk(nextChunkId, new File(chunkStorePath, String.valueOf(nextChunkId++)));
@ -199,62 +187,11 @@ public class RetryableS3OutputStream extends OutputStream
{
currentChunk.close();
final Chunk chunk = currentChunk;
try {
if (chunk.length() > 0) {
resultsSize += chunk.length();
pushStopwatch.start();
pushResults.add(push(chunk));
pushStopwatch.stop();
numChunksPushed++;
}
}
finally {
if (!chunk.delete()) {
LOG.warn("Failed to delete chunk [%s]", chunk.getAbsolutePath());
}
}
}
private PartETag push(Chunk chunk) throws IOException
{
try {
return RetryUtils.retry(
() -> uploadPartIfPossible(uploadId, config.getBucket(), s3Key, chunk),
S3Utils.S3RETRY,
config.getMaxRetry()
futures.add(
uploadManager.queueChunkForUpload(s3, s3Key, chunk.id, chunk.file, uploadId, config)
);
}
catch (AmazonServiceException e) {
throw new IOException(e);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private PartETag uploadPartIfPossible(
String uploadId,
String bucket,
String key,
Chunk chunk
)
{
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(resultsSize);
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withUploadId(uploadId)
.withBucketName(bucket)
.withKey(key)
.withFile(chunk.file)
.withPartNumber(chunk.id)
.withPartSize(chunk.length());
if (LOG.isDebugEnabled()) {
LOG.debug("Pushing chunk [%s] to bucket[%s] and key[%s].", chunk, bucket, key);
}
UploadPartResult uploadResult = s3.uploadPart(uploadPartRequest);
return uploadResult.getPartETag();
}
@Override
@ -268,20 +205,47 @@ public class RetryableS3OutputStream extends OutputStream
// Closeables are closed in LIFO order
closer.register(() -> {
org.apache.commons.io.FileUtils.forceDelete(chunkStorePath);
LOG.info("Deleted chunkStorePath[%s]", chunkStorePath);
// This should be emitted as a metric
long totalChunkSize = (currentChunk.id - 1) * chunkSize + currentChunk.length();
LOG.info(
"Pushed total [%d] parts containing [%d] bytes in [%d]ms.",
numChunksPushed,
resultsSize,
futures.size(),
totalChunkSize,
pushStopwatch.elapsed(TimeUnit.MILLISECONDS)
);
});
closer.register(() -> org.apache.commons.io.FileUtils.forceDelete(chunkStorePath));
try (Closer ignored = closer) {
if (!error) {
pushCurrentChunk();
completeMultipartUpload();
}
}
}
closer.register(() -> {
private void completeMultipartUpload()
{
final List<PartETag> pushResults = new ArrayList<>();
for (Future<UploadPartResult> future : futures) {
if (error) {
future.cancel(true);
}
try {
if (resultsSize > 0 && isAllPushSucceeded()) {
UploadPartResult result = future.get(1, TimeUnit.HOURS);
pushResults.add(result.getPartETag());
}
catch (Exception e) {
error = true;
LOG.error(e, "Error in uploading part for upload ID [%s]", uploadId);
}
}
try {
boolean isAllPushSucceeded = !error && !pushResults.isEmpty() && futures.size() == pushResults.size();
if (isAllPushSucceeded) {
RetryUtils.retry(
() -> s3.completeMultipartUpload(
new CompleteMultipartUploadRequest(config.getBucket(), s3Key, uploadId, pushResults)
@ -301,20 +265,8 @@ public class RetryableS3OutputStream extends OutputStream
}
}
catch (Exception e) {
throw new IOException(e);
throw new RuntimeException(e);
}
});
try (Closer ignored = closer) {
if (!error) {
pushCurrentChunk();
}
}
}
private boolean isAllPushSucceeded()
{
return !error && !pushResults.isEmpty() && numChunksPushed == pushResults.size();
}
private static class Chunk implements Closeable
@ -336,16 +288,6 @@ public class RetryableS3OutputStream extends OutputStream
return outputStream.getCount();
}
private boolean delete()
{
return file.delete();
}
private String getAbsolutePath()
{
return file.getAbsolutePath();
}
@Override
public boolean equals(Object o)
{

View File

@ -56,6 +56,9 @@ public class S3ExportStorageProvider implements ExportStorageProvider
@JacksonInject
ServerSideEncryptingAmazonS3 s3;
@JacksonInject
S3UploadManager s3UploadManager;
@JsonCreator
public S3ExportStorageProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
@ -90,7 +93,7 @@ public class S3ExportStorageProvider implements ExportStorageProvider
s3ExportConfig.getChunkSize(),
s3ExportConfig.getMaxRetry()
);
return new S3StorageConnector(s3OutputConfig, s3);
return new S3StorageConnector(s3OutputConfig, s3, s3UploadManager);
}
@VisibleForTesting

View File

@ -55,15 +55,17 @@ public class S3StorageConnector extends ChunkingStorageConnector<GetObjectReques
private final S3OutputConfig config;
private final ServerSideEncryptingAmazonS3 s3Client;
private final S3UploadManager s3UploadManager;
private static final String DELIM = "/";
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
private static final int MAX_NUMBER_OF_LISTINGS = 1000;
public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3, S3UploadManager s3UploadManager)
{
this.config = config;
this.s3Client = serverSideEncryptingAmazonS3;
this.s3UploadManager = s3UploadManager;
Preconditions.checkNotNull(config, "config is null");
Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in s3 config");
try {
@ -153,7 +155,7 @@ public class S3StorageConnector extends ChunkingStorageConnector<GetObjectReques
@Override
public OutputStream write(String path) throws IOException
{
return new RetryableS3OutputStream(config, s3Client, objectPath(path));
return new RetryableS3OutputStream(config, s3Client, objectPath(path), s3UploadManager);
}
@Override

View File

@ -44,5 +44,6 @@ public class S3StorageConnectorModule implements DruidModule
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.export.storage.s3", S3ExportConfig.class);
JsonConfigProvider.bind(binder, "druid.msq.intermediate.storage", S3OutputConfig.class);
}
}

View File

@ -38,6 +38,9 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag
@JacksonInject
ServerSideEncryptingAmazonS3 s3;
@JacksonInject
S3UploadManager s3UploadManager;
@JsonCreator
public S3StorageConnectorProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
@ -53,6 +56,6 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag
@Override
public StorageConnector get()
{
return new S3StorageConnector(this, s3);
return new S3StorageConnector(this, s3, s3UploadManager);
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3.output;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.RuntimeInfo;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* This class manages uploading files to S3 in chunks, while ensuring that the
* number of chunks currently present on local disk does not exceed a specific limit.
*/
@ManageLifecycle
public class S3UploadManager
{
private final ExecutorService uploadExecutor;
private static final Logger log = new Logger(S3UploadManager.class);
@Inject
public S3UploadManager(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig, RuntimeInfo runtimeInfo)
{
int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors());
int maxNumChunksOnDisk = computeMaxNumChunksOnDisk(s3OutputConfig, s3ExportConfig);
this.uploadExecutor = createExecutorService(poolSize, maxNumChunksOnDisk);
log.info("Initialized executor service for S3 multipart upload with pool size [%d] and work queue capacity [%d]",
poolSize, maxNumChunksOnDisk);
}
/**
* Computes the maximum number of S3 upload chunks that can be kept on disk using the
* maximum chunk size specified in {@link S3OutputConfig} and {@link S3ExportConfig}.
*/
public static int computeMaxNumChunksOnDisk(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig)
{
long maxChunkSize = S3OutputConfig.S3_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES;
if (s3OutputConfig != null && s3OutputConfig.getChunkSize() != null) {
maxChunkSize = Math.max(maxChunkSize, s3OutputConfig.getChunkSize());
}
if (s3ExportConfig != null && s3ExportConfig.getChunkSize() != null) {
maxChunkSize = Math.max(maxChunkSize, s3ExportConfig.getChunkSize().getBytes());
}
return (int) (S3OutputConfig.S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES / maxChunkSize);
}
/**
* Queues a chunk of a file for upload to S3 as part of a multipart upload.
*/
public Future<UploadPartResult> queueChunkForUpload(
ServerSideEncryptingAmazonS3 s3Client,
String key,
int chunkNumber,
File chunkFile,
String uploadId,
S3OutputConfig config
)
{
return uploadExecutor.submit(() -> RetryUtils.retry(
() -> {
log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, uploadId);
UploadPartResult uploadPartResult = uploadPartIfPossible(
s3Client,
uploadId,
config.getBucket(),
key,
chunkNumber,
chunkFile
);
if (!chunkFile.delete()) {
log.warn("Failed to delete chunk [%s]", chunkFile.getAbsolutePath());
}
return uploadPartResult;
},
S3Utils.S3RETRY,
config.getMaxRetry()
));
}
@VisibleForTesting
UploadPartResult uploadPartIfPossible(
ServerSideEncryptingAmazonS3 s3Client,
String uploadId,
String bucket,
String key,
int chunkNumber,
File chunkFile
)
{
final UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withUploadId(uploadId)
.withBucketName(bucket)
.withKey(key)
.withFile(chunkFile)
.withPartNumber(chunkNumber)
.withPartSize(chunkFile.length());
if (log.isDebugEnabled()) {
log.debug("Pushing chunk[%s] to bucket[%s] and key[%s].", chunkNumber, bucket, key);
}
return s3Client.uploadPart(uploadPartRequest);
}
private ExecutorService createExecutorService(int poolSize, int maxNumConcurrentChunks)
{
return Execs.newBlockingThreaded("S3UploadThreadPool-%d", poolSize, maxNumConcurrentChunks);
}
@LifecycleStart
public void start()
{
// No state startup required
}
@LifecycleStop
public void stop()
{
uploadExecutor.shutdown();
}
}

View File

@ -31,12 +31,18 @@ import org.apache.druid.common.aws.AWSModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.StorageConnectorModule;
import org.apache.druid.storage.StorageConnectorProvider;
import org.apache.druid.storage.s3.output.S3ExportConfig;
import org.apache.druid.storage.s3.output.S3OutputConfig;
import org.apache.druid.storage.s3.output.S3StorageConnector;
import org.apache.druid.storage.s3.output.S3StorageConnectorModule;
import org.apache.druid.storage.s3.output.S3StorageConnectorProvider;
import org.apache.druid.storage.s3.output.S3UploadManager;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@ -146,7 +152,15 @@ public class S3StorageConnectorProviderTest
.addValue(
ServerSideEncryptingAmazonS3.class,
new ServerSideEncryptingAmazonS3(null, new NoopServerSideEncryption())
));
)
.addValue(
S3UploadManager.class,
new S3UploadManager(
new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))
)
);
StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get(

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import org.apache.druid.common.aws.AWSModule;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.ServerModule;
import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.apache.druid.segment.loading.OmniDataSegmentMover;
@ -72,7 +73,8 @@ public class S3StorageDruidModuleTest
return GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.of(
new AWSModule(),
new S3StorageDruidModule()
new S3StorageDruidModule(),
new ServerModule()
)
);
}

View File

@ -33,10 +33,10 @@ import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -50,6 +50,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class RetryableS3OutputStreamTest
@ -63,10 +64,11 @@ public class RetryableS3OutputStreamTest
private final TestAmazonS3 s3 = new TestAmazonS3(0);
private final String path = "resultId";
private S3OutputConfig config;
private long chunkSize;
private S3UploadManager s3UploadManager;
@Before
public void setup() throws IOException
{
@ -99,6 +101,11 @@ public class RetryableS3OutputStreamTest
return 2;
}
};
s3UploadManager = new S3UploadManager(
new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1),
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0));
}
@Test
@ -110,7 +117,7 @@ public class RetryableS3OutputStreamTest
config,
s3,
path,
false
s3UploadManager
)) {
for (int i = 0; i < 25; i++) {
bb.clear();
@ -132,7 +139,7 @@ public class RetryableS3OutputStreamTest
config,
s3,
path,
false
s3UploadManager
)) {
bb.clear();
bb.putInt(1);
@ -153,7 +160,7 @@ public class RetryableS3OutputStreamTest
config,
s3,
path,
false
s3UploadManager
)) {
for (int i = 0; i < 600; i++) {
out.write(i);
@ -175,7 +182,7 @@ public class RetryableS3OutputStreamTest
config,
s3,
path,
false
s3UploadManager
)) {
for (int i = 0; i < 25; i++) {
bb.clear();
@ -198,7 +205,7 @@ public class RetryableS3OutputStreamTest
config,
s3,
path,
false
s3UploadManager
)) {
for (int i = 0; i < 2; i++) {
bb.clear();
@ -206,9 +213,6 @@ public class RetryableS3OutputStreamTest
out.write(bb.array());
}
expectedException.expect(RuntimeException.class);
expectedException.expectCause(CoreMatchers.instanceOf(AmazonClientException.class));
expectedException.expectMessage("Upload failure test. Remaining failures [1]");
bb.clear();
bb.putInt(3);
out.write(bb.array());
@ -249,9 +253,11 @@ public class RetryableS3OutputStreamTest
new IOE("Upload failure test. Remaining failures [%s]", --uploadFailuresLeft)
);
}
synchronized (partRequests) {
partRequests.add(request);
}
UploadPartResult result = new UploadPartResult();
result.setETag(StringUtils.format("%s", request.getPartNumber()));
result.setETag(StringUtils.format("etag-%s", request.getPartNumber()));
result.setPartNumber(request.getPartNumber());
return result;
}
@ -275,8 +281,10 @@ public class RetryableS3OutputStreamTest
Assert.assertNotNull(completeRequest);
Assert.assertFalse(cancelled);
Set<Integer> partNumbersFromRequest = partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toSet());
Assert.assertEquals(partRequests.size(), partNumbersFromRequest.size());
for (int i = 0; i < partRequests.size(); i++) {
Assert.assertEquals(i + 1, partRequests.get(i).getPartNumber());
if (i < partRequests.size() - 1) {
Assert.assertEquals(chunkSize, partRequests.get(i).getPartSize());
} else {
@ -286,12 +294,12 @@ public class RetryableS3OutputStreamTest
final List<PartETag> eTags = completeRequest.getPartETags();
Assert.assertEquals(partRequests.size(), eTags.size());
Assert.assertEquals(
partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toList()),
eTags.stream().map(PartETag::getPartNumber).collect(Collectors.toList())
partNumbersFromRequest,
eTags.stream().map(PartETag::getPartNumber).collect(Collectors.toSet())
);
Assert.assertEquals(
partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toList()),
eTags.stream().map(tag -> Integer.parseInt(tag.getETag())).collect(Collectors.toList())
partNumbersFromRequest.stream().map(partNumber -> "etag-" + partNumber).collect(Collectors.toSet()),
eTags.stream().map(PartETag::getETag).collect(Collectors.toSet())
);
Assert.assertEquals(
expectedFileSize,

View File

@ -31,6 +31,8 @@ import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@ -77,14 +79,18 @@ public class S3StorageConnectorTest
public void setup()
{
try {
storageConnector = new S3StorageConnector(new S3OutputConfig(
S3OutputConfig s3OutputConfig = new S3OutputConfig(
BUCKET,
PREFIX,
temporaryFolder.newFolder(),
null,
null,
true
), service);
);
storageConnector = new S3StorageConnector(s3OutputConfig, service, new S3UploadManager(
s3OutputConfig,
new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null),
new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)));
}
catch (IOException e) {
throw new RuntimeException(e);

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3.output;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.RuntimeInfo;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
public class S3UploadManagerTest
{
private S3UploadManager s3UploadManager;
private S3OutputConfig s3OutputConfig;
private S3ExportConfig s3ExportConfig;
@Before
public void setUp()
{
s3OutputConfig = new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("100MiB"), 1);
s3ExportConfig = new S3ExportConfig("tempDir", new HumanReadableBytes("200MiB"), 1, null);
final RuntimeInfo runtimeInfo = new DruidProcessingConfigTest.MockRuntimeInfo(8, 0, 0);
s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, runtimeInfo);
}
@Test
public void testQueueChunkForUpload() throws Exception
{
ServerSideEncryptingAmazonS3 s3Client = EasyMock.mock(ServerSideEncryptingAmazonS3.class);
File chunkFile = EasyMock.mock(File.class);
EasyMock.expect(chunkFile.length()).andReturn(1024L).anyTimes();
EasyMock.expect(chunkFile.delete()).andReturn(true).anyTimes();
int chunkId = 42;
UploadPartResult uploadPartResult = new UploadPartResult();
uploadPartResult.setPartNumber(chunkId);
uploadPartResult.setETag("etag");
EasyMock.expect(s3Client.uploadPart(EasyMock.anyObject(UploadPartRequest.class))).andReturn(uploadPartResult);
EasyMock.replay(chunkFile, s3Client);
Future<UploadPartResult> result = s3UploadManager.queueChunkForUpload(s3Client, "test-key", chunkId, chunkFile, "upload-id", s3OutputConfig);
UploadPartResult futureResult = result.get();
Assert.assertEquals(chunkId, futureResult.getPartNumber());
Assert.assertEquals("etag", futureResult.getETag());
}
@Test
public void testComputeMaxNumChunksOnDisk()
{
int maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(s3OutputConfig, s3ExportConfig);
int expectedMaxNumConcurrentChunks = 25; // maxChunkSizePossible/200 MB
assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks);
}
@Test
public void testComputeMaxNumChunksOnDiskWithNullOutputConfig()
{
// Null S3OutputConfig
int maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(null, s3ExportConfig);
int expectedMaxNumConcurrentChunks = 25; // maxChunkSizePossible / s3ExportConfig's chunk size
assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks);
// Null S3OutputConfig#getChunkSize()
maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(EasyMock.mock(S3OutputConfig.class), s3ExportConfig);
assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks);
}
@Test
public void testComputeMaxNumChunksOnDiskWithNullExportConfig()
{
// Null S3ExportConfig
int maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(s3OutputConfig, null);
int expectedMaxNumConcurrentChunks = 51; // maxChunkSizePossible / s3OutputConfig's chunk size
assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks);
// Null S3ExportConfig#getChunkSize()
maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(s3OutputConfig, EasyMock.mock(S3ExportConfig.class));
assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks);
}
@Test
public void testUploadPartIfPossible()
{
ServerSideEncryptingAmazonS3 s3Client = EasyMock.mock(ServerSideEncryptingAmazonS3.class);
File chunkFile = EasyMock.mock(File.class);
EasyMock.expect(chunkFile.length()).andReturn(1024L).anyTimes();
UploadPartResult uploadPartResult = new UploadPartResult();
Capture<UploadPartRequest> partRequestCapture = EasyMock.newCapture();
EasyMock.expect(s3Client.uploadPart(EasyMock.capture(partRequestCapture))).andReturn(uploadPartResult);
EasyMock.replay(s3Client, chunkFile);
UploadPartResult result = s3UploadManager.uploadPartIfPossible(s3Client, "upload-id", "bucket", "key", 1, chunkFile);
UploadPartRequest capturedRequest = partRequestCapture.getValue();
assertEquals("upload-id", capturedRequest.getUploadId());
assertEquals("bucket", capturedRequest.getBucketName());
assertEquals("key", capturedRequest.getKey());
assertEquals(1, capturedRequest.getPartNumber());
assertEquals(chunkFile, capturedRequest.getFile());
assertEquals(1024L, capturedRequest.getPartSize());
assertEquals(uploadPartResult, result);
}
@After
public void teardown()
{
s3UploadManager.stop();
}
}

View File

@ -42,7 +42,7 @@ import java.util.Iterator;
* <li>{@code druid.extension.custom.type="s3"}
* <li>{@code druid.extension.custom.bucket="myBucket"}
* </ul>
* The final state of this inteface would have
* The final state of this interface would have
* <ol>
* <li>Future Non blocking API's</li>
* <li>Offset based fetch</li>