mirror of https://github.com/apache/jclouds.git
Lazily open parts during LocalBlobStore complete MPU
This removes a previous workaround for opening too many FileInputStream and exhausting rlimits.
This commit is contained in:
parent
12de6ef74d
commit
b7f28f1e6a
|
@ -28,7 +28,6 @@ import static com.google.common.collect.Sets.newTreeSet;
|
|||
import static org.jclouds.blobstore.options.ListContainerOptions.Builder.recursive;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
|
@ -731,14 +730,8 @@ public final class LocalBlobStore implements BlobStore {
|
|||
Payload payload;
|
||||
try {
|
||||
InputStream is = blob.getPayload().openStream();
|
||||
if (is instanceof FileInputStream) {
|
||||
// except for FileInputStream since large MPU can open too many fds
|
||||
is.close();
|
||||
payload = blob.getPayload();
|
||||
} else {
|
||||
blob.resetPayload(/*release=*/ false);
|
||||
payload = new InputStreamPayload(is);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
|
@ -825,16 +818,16 @@ public final class LocalBlobStore implements BlobStore {
|
|||
|
||||
@Override
|
||||
public String completeMultipartUpload(MultipartUpload mpu, List<MultipartPart> parts) {
|
||||
ImmutableList.Builder<Blob> blobs = ImmutableList.builder();
|
||||
ImmutableList.Builder<BlobMetadata> metas = ImmutableList.builder();
|
||||
long contentLength = 0;
|
||||
Hasher md5Hasher = Hashing.md5().newHasher();
|
||||
|
||||
for (MultipartPart part : parts) {
|
||||
Blob blobPart = getBlob(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
|
||||
contentLength += blobPart.getMetadata().getContentMetadata().getContentLength();
|
||||
blobs.add(blobPart);
|
||||
if (blobPart.getMetadata().getETag() != null) {
|
||||
md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(blobPart.getMetadata().getETag()));
|
||||
BlobMetadata meta = blobMetadata(mpu.containerName(), MULTIPART_PREFIX + mpu.id() + "-" + mpu.blobName() + "-" + part.partNumber());
|
||||
contentLength += meta.getContentMetadata().getContentLength();
|
||||
metas.add(meta);
|
||||
if (meta.getETag() != null) {
|
||||
md5Hasher.putBytes(BaseEncoding.base16().lowerCase().decode(meta.getETag()));
|
||||
}
|
||||
}
|
||||
String mpuETag = new StringBuilder("\"")
|
||||
|
@ -845,7 +838,7 @@ public final class LocalBlobStore implements BlobStore {
|
|||
.toString();
|
||||
PayloadBlobBuilder blobBuilder = blobBuilder(mpu.blobName())
|
||||
.userMetadata(mpu.blobMetadata().getUserMetadata())
|
||||
.payload(new MultiBlobInputStream(blobs.build()))
|
||||
.payload(new MultiBlobInputStream(this, metas.build()))
|
||||
.contentLength(contentLength)
|
||||
.eTag(mpuETag);
|
||||
String cacheControl = mpu.blobMetadata().getContentMetadata().getCacheControl();
|
||||
|
@ -995,21 +988,24 @@ public final class LocalBlobStore implements BlobStore {
|
|||
}
|
||||
|
||||
private static final class MultiBlobInputStream extends InputStream {
|
||||
private final Iterator<Blob> blobs;
|
||||
private final BlobStore blobStore;
|
||||
private final Iterator<BlobMetadata> metas;
|
||||
private InputStream current;
|
||||
|
||||
MultiBlobInputStream(List<Blob> blobs) {
|
||||
this.blobs = blobs.iterator();
|
||||
MultiBlobInputStream(BlobStore blobStore, List<BlobMetadata> metas) {
|
||||
this.blobStore = blobStore;
|
||||
this.metas = metas.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
while (true) {
|
||||
if (current == null) {
|
||||
if (!blobs.hasNext()) {
|
||||
if (!metas.hasNext()) {
|
||||
return -1;
|
||||
}
|
||||
current = blobs.next().getPayload().openStream();
|
||||
BlobMetadata meta = metas.next();
|
||||
current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream();
|
||||
}
|
||||
int result = current.read();
|
||||
if (result == -1) {
|
||||
|
@ -1025,10 +1021,11 @@ public final class LocalBlobStore implements BlobStore {
|
|||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
while (true) {
|
||||
if (current == null) {
|
||||
if (!blobs.hasNext()) {
|
||||
if (!metas.hasNext()) {
|
||||
return -1;
|
||||
}
|
||||
current = blobs.next().getPayload().openStream();
|
||||
BlobMetadata meta = metas.next();
|
||||
current = blobStore.getBlob(meta.getContainer(), meta.getName()).getPayload().openStream();
|
||||
}
|
||||
int result = current.read(b, off, len);
|
||||
if (result == -1) {
|
||||
|
|
Loading…
Reference in New Issue