More fixes to parallel download resource cleanup

This commit is contained in:
Zack Shoylev 2016-09-13 01:20:08 -05:00
parent de68c2a1b0
commit 4bbca9edf9
2 changed files with 51 additions and 18 deletions

View File

@ -31,12 +31,14 @@ import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@ -677,6 +679,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
RandomAccessFile raf = null;
File tempFile = new File(destination.getName() + "." + UUID.randomUUID());
try {
long contentLength = api
.getObjectApi(regionId, container)
@ -686,7 +689,7 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
.getContentLength();
// Reserve space for performance reasons
raf = new RandomAccessFile(destination.getAbsoluteFile(), "rw");
raf = new RandomAccessFile(tempFile, "rw");
raf.seek(contentLength - 1);
raf.write(0);
@ -706,12 +709,24 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
Futures.getUnchecked(Futures.allAsList(results));
raf.getChannel().force(true);
raf.getChannel().close();
raf.close();
if (destination.exists()) {
destination.delete();
}
if (!tempFile.renameTo(destination)) {
throw new RuntimeException("Could not move temporary downloaded file to destination " + destination);
}
tempFile = null;
} catch (IOException e) {
Closeables2.closeQuietly(raf);
destination.delete();
throw new RuntimeException(e);
} finally {
Closeables2.closeQuietly(raf);
if (tempFile != null) {
tempFile.delete();
}
}
}
@ -745,6 +760,11 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
MappedByteBuffer out = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, begin, end - begin + 1);
out.put(targetArray);
out.force();
// JDK-4715154 ; TODO: Java 8 FileChannels
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
closeDirectBuffer(out);
}
} catch (IOException e) {
lastException = e;
continue;
@ -753,8 +773,24 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
return null;
}
throw new RuntimeException("After " + retryCountLimit + " retries: " + lastException);
}
// JDK-4715154
private void closeDirectBuffer(MappedByteBuffer mbb) {
if ( mbb == null || !mbb.isDirect() )
return;
try {
Method cleaner = mbb.getClass().getMethod("cleaner");
cleaner.setAccessible(true);
Method clean = Class.forName("sun.misc.Cleaner").getMethod("clean");
clean.setAccessible(true);
clean.invoke(cleaner.invoke(mbb));
} catch (Exception e) {
logger.warn(e.toString());
}
}
}
@Beta
@Override
@ -799,8 +835,8 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
public void run() {
ListenableFuture<byte[]> result;
long from;
for (from = 0; from < contentLength; from = from + partSize) {
try {
try {
for (from = 0; from < contentLength; from = from + partSize) {
logger.debug(Thread.currentThread() + " writing to output");
result = results.take();
if (result == null) {
@ -809,20 +845,16 @@ public class RegionScopedSwiftBlobStore implements BlobStore {
throw new RuntimeException("Error downloading file part to stream");
}
output.write(result.get());
} catch (Exception e) {
logger.debug(e.toString());
try {
// close pipe so client is notified of an exception
input.close();
} catch (IOException e1) {}
try {
output.close();
} catch (IOException e1) {}
throw new RuntimeException(e);
}
} catch (Exception e) {
logger.debug(e.toString());
// Close pipe so client is notified of an exception
Closeables2.closeQuietly(input);
throw new RuntimeException(e);
} finally {
// Finished writing results to stream
Closeables2.closeQuietly(output);
}
// Finished writing results to stream
Closeables2.closeQuietly(output);
}
});

View File

@ -57,7 +57,7 @@ import com.google.common.util.concurrent.MoreExecutors;
public class RegionScopedSwiftBlobStoreParallelLiveTest extends BaseBlobStoreIntegrationTest {
private final File BIG_FILE = new File("random.dat");
private final long SIZE = 1000000000; //10 * 1000 * 1000;
private final long SIZE = 10 * 1000 * 1000;
private BlobStore blobStore;
private String ETAG;
private ListeningExecutorService executor =
@ -100,6 +100,7 @@ public class RegionScopedSwiftBlobStoreParallelLiveTest extends BaseBlobStoreInt
public void cleanupFiles() {
// Delete local file
delete(BIG_FILE);
delete(new File(BIG_FILE + ".downloaded"));
// Delete uploaded file
blobStore.clearContainer(CONTAINER);