diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java index 184e1bdfe10..ea583290af4 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java @@ -25,19 +25,33 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; import org.apache.druid.data.input.impl.RetryingInputStream; import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; +import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.IOE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import javax.annotation.Nonnull; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.SequenceInputStream; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; public class S3StorageConnector implements StorageConnector @@ -48,11 +62,23 @@ public class S3StorageConnector implements StorageConnector private static final String DELIM = "/"; private static final Joiner JOINER = Joiner.on(DELIM).skipNulls(); + private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000; public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3) { this.config = config; this.s3Client = serverSideEncryptingAmazonS3; + Preconditions.checkNotNull(config, "config is null"); + Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in s3 config"); + try { + FileUtils.mkdirp(config.getTempDir()); + } + catch (IOException e) { + throw new RE( + e, + StringUtils.format("Cannot create tempDir : [%s] for s3 storage connector", config.getTempDir()) + ); + } } @Override @@ -62,13 +88,13 @@ public class S3StorageConnector implements StorageConnector } @Override - public InputStream read(String path) throws IOException + public InputStream read(String path) { - return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path))); + return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)), path); } @Override - public InputStream readRange(String path, long from, long size) throws IOException + public InputStream readRange(String path, long from, long size) { if (from < 0 || size < 0) { throw new IAE( @@ -78,35 +104,115 @@ public class S3StorageConnector implements StorageConnector size ); } - return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1)); + return buildInputStream( + new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1), + path + ); } - private RetryingInputStream buildInputStream(GetObjectRequest getObjectRequest) throws IOException + private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path) { - return new RetryingInputStream<>( - getObjectRequest, - new ObjectOpenFunction() - { - @Override - public InputStream open(GetObjectRequest object) - { - return s3Client.getObject(object).getObjectContent(); - } + // fetch the size of the whole object to make chunks + long readEnd; + AtomicLong currReadStart = new AtomicLong(0); + if (getObjectRequest.getRange() != null) { + currReadStart.set(getObjectRequest.getRange()[0]); + readEnd = getObjectRequest.getRange()[1] + 1; + } else { + readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength(); + } - @Override - public InputStream open(GetObjectRequest object, long offset) - { - final GetObjectRequest offsetObjectRequest = new GetObjectRequest( - object.getBucketName(), - object.getKey() + // build a sequence input stream from chunks + return new SequenceInputStream(new Enumeration() + { + @Override + public boolean hasMoreElements() + { + // don't stop until the whole object is downloaded + return currReadStart.get() < readEnd; + } + + @Override + public InputStream nextElement() + { + File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString()); + // in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE + long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1; + try { + if (!outFile.createNewFile()) { + throw new IOE( + StringUtils.format( + "Could not create temporary file [%s] for copying [%s]", + outFile.getAbsolutePath(), + objectPath(path) + ) ); - offsetObjectRequest.setRange(offset); - return open(offsetObjectRequest); } - }, - S3Utils.S3RETRY, - config.getMaxRetry() - ); + FileUtils.copyLarge( + () -> new RetryingInputStream<>( + new GetObjectRequest( + config.getBucket(), + objectPath(path) + ).withRange(currReadStart.get(), endPoint), + new ObjectOpenFunction() + { + @Override + public InputStream open(GetObjectRequest object) + { + return s3Client.getObject(object).getObjectContent(); + } + + @Override + public InputStream open(GetObjectRequest object, long offset) + { + if (object.getRange() != null) { + long[] oldRange = object.getRange(); + object.setRange(oldRange[0] + offset, oldRange[1]); + } else { + object.setRange(offset); + } + return open(object); + } + }, + S3Utils.S3RETRY, + 3 + ), + outFile, + new byte[8 * 1024], + Predicates.alwaysFalse(), + 1, + StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath()) + ); + } + catch (IOException e) { + throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", objectPath(path), outFile)); + } + try { + AtomicBoolean isClosed = new AtomicBoolean(false); + return new FileInputStream(outFile) + { + @Override + public void close() throws IOException + { + // close should be idempotent + if (isClosed.get()) { + return; + } + isClosed.set(true); + super.close(); + // since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1 + currReadStart.set(endPoint + 1); + if (!outFile.delete()) { + throw new RE("Cannot delete temp file [%s]", outFile); + } + } + }; + } + catch (FileNotFoundException e) { + throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile)); + } + } + }); } @Override diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java index 154918cc2d9..0a02dce4d28 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java @@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableList; @@ -104,9 +105,15 @@ public class S3StorageConnectorTest public void pathRead() throws IOException { EasyMock.reset(S3_CLIENT); + ObjectMetadata objectMetadata = new ObjectMetadata(); + long contentLength = "test".getBytes(StandardCharsets.UTF_8).length; + objectMetadata.setContentLength(contentLength); S3Object s3Object = new S3Object(); s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8))); - EasyMock.expect(S3_CLIENT.getObject(new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE))).andReturn(s3Object); + EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata); + EasyMock.expect(S3_CLIENT.getObject( + new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1)) + ).andReturn(s3Object); EasyMock.replay(S3_CLIENT); String readText = new BufferedReader( @@ -141,8 +148,8 @@ public class S3StorageConnectorTest InputStream is = storageConnector.readRange(TEST_FILE, start, length); byte[] dataBytes = new byte[length]; - Assert.assertEquals(is.read(dataBytes), length); - Assert.assertEquals(is.read(), -1); // reading further produces no data + Assert.assertEquals(length, is.read(dataBytes)); + Assert.assertEquals(-1, is.read()); // reading further produces no data Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8)); EasyMock.reset(S3_CLIENT); }