mirror of https://github.com/apache/druid.git
Robust handling and management of S3 streams for MSQ shuffle storage (#13741)
This commit is contained in:
parent
b33962cab7
commit
a0f8889f23
|
@ -25,19 +25,33 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
|||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
import org.apache.druid.data.input.impl.RetryingInputStream;
|
||||
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
|
||||
import org.apache.druid.java.util.common.FileUtils;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.IOE;
|
||||
import org.apache.druid.java.util.common.RE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.storage.StorageConnector;
|
||||
import org.apache.druid.storage.s3.S3Utils;
|
||||
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.SequenceInputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Enumeration;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class S3StorageConnector implements StorageConnector
|
||||
|
@ -48,11 +62,23 @@ public class S3StorageConnector implements StorageConnector
|
|||
|
||||
private static final String DELIM = "/";
|
||||
private static final Joiner JOINER = Joiner.on(DELIM).skipNulls();
|
||||
private static final long DOWNLOAD_MAX_CHUNK_SIZE = 100_000_000;
|
||||
|
||||
public S3StorageConnector(S3OutputConfig config, ServerSideEncryptingAmazonS3 serverSideEncryptingAmazonS3)
|
||||
{
|
||||
this.config = config;
|
||||
this.s3Client = serverSideEncryptingAmazonS3;
|
||||
Preconditions.checkNotNull(config, "config is null");
|
||||
Preconditions.checkNotNull(config.getTempDir(), "tempDir is null in s3 config");
|
||||
try {
|
||||
FileUtils.mkdirp(config.getTempDir());
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RE(
|
||||
e,
|
||||
StringUtils.format("Cannot create tempDir : [%s] for s3 storage connector", config.getTempDir())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -62,13 +88,13 @@ public class S3StorageConnector implements StorageConnector
|
|||
}
|
||||
|
||||
@Override
|
||||
public InputStream read(String path) throws IOException
|
||||
public InputStream read(String path)
|
||||
{
|
||||
return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)));
|
||||
return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)), path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream readRange(String path, long from, long size) throws IOException
|
||||
public InputStream readRange(String path, long from, long size)
|
||||
{
|
||||
if (from < 0 || size < 0) {
|
||||
throw new IAE(
|
||||
|
@ -78,35 +104,115 @@ public class S3StorageConnector implements StorageConnector
|
|||
size
|
||||
);
|
||||
}
|
||||
return buildInputStream(new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1));
|
||||
return buildInputStream(
|
||||
new GetObjectRequest(config.getBucket(), objectPath(path)).withRange(from, from + size - 1),
|
||||
path
|
||||
);
|
||||
}
|
||||
|
||||
private RetryingInputStream buildInputStream(GetObjectRequest getObjectRequest) throws IOException
|
||||
private InputStream buildInputStream(GetObjectRequest getObjectRequest, String path)
|
||||
{
|
||||
return new RetryingInputStream<>(
|
||||
getObjectRequest,
|
||||
new ObjectOpenFunction<GetObjectRequest>()
|
||||
{
|
||||
@Override
|
||||
public InputStream open(GetObjectRequest object)
|
||||
{
|
||||
return s3Client.getObject(object).getObjectContent();
|
||||
}
|
||||
// fetch the size of the whole object to make chunks
|
||||
long readEnd;
|
||||
AtomicLong currReadStart = new AtomicLong(0);
|
||||
if (getObjectRequest.getRange() != null) {
|
||||
currReadStart.set(getObjectRequest.getRange()[0]);
|
||||
readEnd = getObjectRequest.getRange()[1] + 1;
|
||||
} else {
|
||||
readEnd = this.s3Client.getObjectMetadata(config.getBucket(), objectPath(path)).getInstanceLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream open(GetObjectRequest object, long offset)
|
||||
{
|
||||
final GetObjectRequest offsetObjectRequest = new GetObjectRequest(
|
||||
object.getBucketName(),
|
||||
object.getKey()
|
||||
// build a sequence input stream from chunks
|
||||
return new SequenceInputStream(new Enumeration<InputStream>()
|
||||
{
|
||||
@Override
|
||||
public boolean hasMoreElements()
|
||||
{
|
||||
// don't stop until the whole object is downloaded
|
||||
return currReadStart.get() < readEnd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream nextElement()
|
||||
{
|
||||
File outFile = new File(config.getTempDir().getAbsolutePath(), UUID.randomUUID().toString());
|
||||
// in a single chunk, only download a maximum of DOWNLOAD_MAX_CHUNK_SIZE
|
||||
long endPoint = Math.min(currReadStart.get() + DOWNLOAD_MAX_CHUNK_SIZE, readEnd) - 1;
|
||||
try {
|
||||
if (!outFile.createNewFile()) {
|
||||
throw new IOE(
|
||||
StringUtils.format(
|
||||
"Could not create temporary file [%s] for copying [%s]",
|
||||
outFile.getAbsolutePath(),
|
||||
objectPath(path)
|
||||
)
|
||||
);
|
||||
offsetObjectRequest.setRange(offset);
|
||||
return open(offsetObjectRequest);
|
||||
}
|
||||
},
|
||||
S3Utils.S3RETRY,
|
||||
config.getMaxRetry()
|
||||
);
|
||||
FileUtils.copyLarge(
|
||||
() -> new RetryingInputStream<>(
|
||||
new GetObjectRequest(
|
||||
config.getBucket(),
|
||||
objectPath(path)
|
||||
).withRange(currReadStart.get(), endPoint),
|
||||
new ObjectOpenFunction<GetObjectRequest>()
|
||||
{
|
||||
@Override
|
||||
public InputStream open(GetObjectRequest object)
|
||||
{
|
||||
return s3Client.getObject(object).getObjectContent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream open(GetObjectRequest object, long offset)
|
||||
{
|
||||
if (object.getRange() != null) {
|
||||
long[] oldRange = object.getRange();
|
||||
object.setRange(oldRange[0] + offset, oldRange[1]);
|
||||
} else {
|
||||
object.setRange(offset);
|
||||
}
|
||||
return open(object);
|
||||
}
|
||||
},
|
||||
S3Utils.S3RETRY,
|
||||
3
|
||||
),
|
||||
outFile,
|
||||
new byte[8 * 1024],
|
||||
Predicates.alwaysFalse(),
|
||||
1,
|
||||
StringUtils.format("Retrying copying of [%s] to [%s]", objectPath(path), outFile.getAbsolutePath())
|
||||
);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RE(e, StringUtils.format("Unable to copy [%s] to [%s]", objectPath(path), outFile));
|
||||
}
|
||||
try {
|
||||
AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||
return new FileInputStream(outFile)
|
||||
{
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
// close should be idempotent
|
||||
if (isClosed.get()) {
|
||||
return;
|
||||
}
|
||||
isClosed.set(true);
|
||||
super.close();
|
||||
// since endPoint is inclusive in s3's get request API, the next currReadStart is endpoint + 1
|
||||
currReadStart.set(endPoint + 1);
|
||||
if (!outFile.delete()) {
|
||||
throw new RE("Cannot delete temp file [%s]", outFile);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
catch (FileNotFoundException e) {
|
||||
throw new RE(e, StringUtils.format("Unable to find temp file [%s]", outFile));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -104,9 +105,15 @@ public class S3StorageConnectorTest
|
|||
public void pathRead() throws IOException
|
||||
{
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
ObjectMetadata objectMetadata = new ObjectMetadata();
|
||||
long contentLength = "test".getBytes(StandardCharsets.UTF_8).length;
|
||||
objectMetadata.setContentLength(contentLength);
|
||||
S3Object s3Object = new S3Object();
|
||||
s3Object.setObjectContent(new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8)));
|
||||
EasyMock.expect(S3_CLIENT.getObject(new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE))).andReturn(s3Object);
|
||||
EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject())).andReturn(objectMetadata);
|
||||
EasyMock.expect(S3_CLIENT.getObject(
|
||||
new GetObjectRequest(BUCKET, PREFIX + "/" + TEST_FILE).withRange(0, contentLength - 1))
|
||||
).andReturn(s3Object);
|
||||
EasyMock.replay(S3_CLIENT);
|
||||
|
||||
String readText = new BufferedReader(
|
||||
|
@ -141,8 +148,8 @@ public class S3StorageConnectorTest
|
|||
|
||||
InputStream is = storageConnector.readRange(TEST_FILE, start, length);
|
||||
byte[] dataBytes = new byte[length];
|
||||
Assert.assertEquals(is.read(dataBytes), length);
|
||||
Assert.assertEquals(is.read(), -1); // reading further produces no data
|
||||
Assert.assertEquals(length, is.read(dataBytes));
|
||||
Assert.assertEquals(-1, is.read()); // reading further produces no data
|
||||
Assert.assertEquals(dataQueried, new String(dataBytes, StandardCharsets.UTF_8));
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue