FileSmoosher requested changes (#3673)

* FileSmoosher requested changes 

from https://github.com/metamx/java-util/pull/55

* Addressed code review requested changes.
This commit is contained in:
Akash Dwivedi 2016-11-10 10:11:49 -08:00 committed by Fangjin Yang
parent b76b3f8d85
commit 1acc816196
2 changed files with 55 additions and 50 deletions

View File

@ -31,6 +31,7 @@ import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.MappedByteBufferHandler; import io.druid.java.util.common.MappedByteBufferHandler;
import io.druid.java.util.common.logger.Logger;
import java.io.BufferedWriter; import java.io.BufferedWriter;
import java.io.Closeable; import java.io.Closeable;
@ -59,7 +60,9 @@ import java.util.Set;
* various "chunk" files will be varying sizes and it is not possible to add a * various "chunk" files will be varying sizes and it is not possible to add a
* file of size greater than Integer.MAX_VALUE. * file of size greater than Integer.MAX_VALUE.
* <p/> * <p/>
* This class is not thread safe but allows writing multiple files even if main * This class is not thread safe.
* <p/>
* This class allows writing multiple files even if main
* smoosh file writer is open. If main smoosh file writer is already open, it * smoosh file writer is open. If main smoosh file writer is already open, it
* delegates the write into temporary file on the file system which is later * delegates the write into temporary file on the file system which is later
* copied on to the main smoosh file and underlying temporary file will be * copied on to the main smoosh file and underlying temporary file will be
@ -69,6 +72,7 @@ public class FileSmoosher implements Closeable
{ {
private static final String FILE_EXTENSION = "smoosh"; private static final String FILE_EXTENSION = "smoosh";
private static final Joiner joiner = Joiner.on(","); private static final Joiner joiner = Joiner.on(",");
private static final Logger LOG = new Logger(FileSmoosher.class);
private final File baseDir; private final File baseDir;
private final int maxChunkSize; private final int maxChunkSize;
@ -101,6 +105,16 @@ public class FileSmoosher implements Closeable
Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value."); Preconditions.checkArgument(maxChunkSize > 0, "maxChunkSize must be a positive value.");
} }
static File metaFile(File baseDir)
{
return new File(baseDir, String.format("meta.%s", FILE_EXTENSION));
}
static File makeChunkFile(File baseDir, int i)
{
return new File(baseDir, String.format("%05d.%s", i, FILE_EXTENSION));
}
public Set<String> getInternalFilenames() public Set<String> getInternalFilenames()
{ {
return internalFiles.keySet(); return internalFiles.keySet();
@ -155,8 +169,7 @@ public class FileSmoosher implements Closeable
// If current writer is in use then create a new SmooshedWriter which // If current writer is in use then create a new SmooshedWriter which
// writes into temporary file which is later merged into original // writes into temporary file which is later merged into original
// FileSmoosher. // FileSmoosher.
if (writerCurrentlyInUse) if (writerCurrentlyInUse) {
{
return delegateSmooshedWriter(name, size); return delegateSmooshedWriter(name, size);
} }
@ -251,10 +264,11 @@ public class FileSmoosher implements Closeable
// Get processed elements from the stack and write. // Get processed elements from the stack and write.
List<File> fileToProcess = new ArrayList<>(completedFiles); List<File> fileToProcess = new ArrayList<>(completedFiles);
completedFiles = Lists.newArrayList(); completedFiles = Lists.newArrayList();
for (File file: fileToProcess) for (File file : fileToProcess) {
{
add(file); add(file);
file.delete(); if (!file.delete()) {
LOG.warn("Unable to delete file [%s]", file);
}
} }
} }
@ -265,7 +279,9 @@ public class FileSmoosher implements Closeable
* *
* @param name fileName * @param name fileName
* @param size size of the file. * @param size size of the file.
*
* @return * @return
*
* @throws IOException * @throws IOException
*/ */
private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException private SmooshedWriter delegateSmooshedWriter(final String name, final long size) throws IOException
@ -275,14 +291,17 @@ public class FileSmoosher implements Closeable
return new SmooshedWriter() return new SmooshedWriter()
{ {
private int currOffset = 0;
private final FileOutputStream out = new FileOutputStream(tmpFile); private final FileOutputStream out = new FileOutputStream(tmpFile);
private final GatheringByteChannel channel = out.getChannel();; private final GatheringByteChannel channel = out.getChannel();
private final Closer closer = Closer.create(); private final Closer closer = Closer.create();
private int currOffset = 0;
{ {
closer.register(out); closer.register(out);
closer.register(channel); closer.register(channel);
} }
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
@ -294,6 +313,7 @@ public class FileSmoosher implements Closeable
mergeWithSmoosher(); mergeWithSmoosher();
} }
} }
public int bytesLeft() public int bytesLeft()
{ {
return (int) (size - currOffset); return (int) (size - currOffset);
@ -347,17 +367,21 @@ public class FileSmoosher implements Closeable
public void close() throws IOException public void close() throws IOException
{ {
//book keeping checks on created file. //book keeping checks on created file.
if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) if (!completedFiles.isEmpty() || !filesInProcess.isEmpty()) {
{ for (File file : completedFiles) {
for (File file: completedFiles) if (!file.delete()) {
{ LOG.warn("Unable to delete file [%s]", file);
file.delete(); }
} }
for (File file: filesInProcess) for (File file : filesInProcess) {
{ if (!file.delete()) {
file.delete(); LOG.warn("Unable to delete file [%s]", file);
}
} }
throw new ISE(String.format("%d writers needs to be closed before closing smoosher.", filesInProcess.size() + completedFiles.size())); throw new ISE(
"[%d] writers in progress and [%d] completed writers needs to be closed before closing smoosher.",
filesInProcess.size(), completedFiles.size()
);
} }
if (currOut != null) { if (currOut != null) {
@ -393,16 +417,6 @@ public class FileSmoosher implements Closeable
return new Outer(fileNum, new FileOutputStream(outFile), maxChunkSize); return new Outer(fileNum, new FileOutputStream(outFile), maxChunkSize);
} }
static File metaFile(File baseDir)
{
return new File(baseDir, String.format("meta.%s", FILE_EXTENSION));
}
static File makeChunkFile(File baseDir, int i)
{
return new File(baseDir, String.format("%05d.%s", i, FILE_EXTENSION));
}
public static class Outer implements SmooshedWriter public static class Outer implements SmooshedWriter
{ {
private final int fileNum; private final int fileNum;

View File

@ -23,7 +23,6 @@ import com.google.common.io.Files;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import io.druid.java.util.common.BufferUtils; import io.druid.java.util.common.BufferUtils;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.CloseQuietly;
import junit.framework.Assert; import junit.framework.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -60,23 +59,18 @@ public class SmooshedFileMapperTest
@Test @Test
public void testWhenFirstWriterClosedInTheMiddle() throws Exception public void testWhenFirstWriterClosedInTheMiddle() throws Exception
{ {
File baseDir = Files.createTempDir(); File baseDir = folder.newFolder("base");
File[] files = baseDir.listFiles();
Assert.assertNotNull(files);
Arrays.sort(files);
try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) {
{
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4);
for (int i = 0; i < 19; ++i) { for (int i = 0; i < 19; ++i) {
File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin"); File tmpFile = File.createTempFile(String.format("smoosh-%s", i), ".bin");
Files.write(Ints.toByteArray(i), tmpFile); Files.write(Ints.toByteArray(i), tmpFile);
smoosher.add(String.format("%d", i), tmpFile); smoosher.add(String.format("%d", i), tmpFile);
if (i==10) if (i == 10) {
{
writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));
CloseQuietly.close(writer); writer.close();
} }
tmpFile.delete(); tmpFile.delete();
} }
@ -84,28 +78,25 @@ public class SmooshedFileMapperTest
validateOutput(baseDir); validateOutput(baseDir);
} }
@Test(expected= ISE.class) @Test(expected = ISE.class)
public void testExceptionForUnClosedFiles() throws Exception public void testExceptionForUnClosedFiles() throws Exception
{ {
File baseDir = Files.createTempDir(); File baseDir = folder.newFolder("base");
try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) {
{
for (int i = 0; i < 19; ++i) { for (int i = 0; i < 19; ++i) {
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4); final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", i), 4);
writer.write(ByteBuffer.wrap(Ints.toByteArray(i))); writer.write(ByteBuffer.wrap(Ints.toByteArray(i)));
} }
smoosher.close();
} }
} }
@Test @Test
public void testWhenFirstWriterClosedAtTheEnd() throws Exception public void testWhenFirstWriterClosedAtTheEnd() throws Exception
{ {
File baseDir = Files.createTempDir(); File baseDir = folder.newFolder("base");
try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) try (FileSmoosher smoosher = new FileSmoosher(baseDir, 21)) {
{
final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4); final SmooshedWriter writer = smoosher.addWithSmooshedWriter(String.format("%d", 19), 4);
writer.write(ByteBuffer.wrap(Ints.toByteArray(19))); writer.write(ByteBuffer.wrap(Ints.toByteArray(19)));
@ -115,8 +106,7 @@ public class SmooshedFileMapperTest
smoosher.add(String.format("%d", i), tmpFile); smoosher.add(String.format("%d", i), tmpFile);
tmpFile.delete(); tmpFile.delete();
} }
CloseQuietly.close(writer); writer.close();
smoosher.close();
} }
validateOutput(baseDir); validateOutput(baseDir);
} }
@ -170,7 +160,8 @@ public class SmooshedFileMapperTest
boolean exceptionThrown = false; boolean exceptionThrown = false;
try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter("1", 2)) { try (final SmooshedWriter writer = smoosher.addWithSmooshedWriter("1", 2)) {
writer.write(ByteBuffer.wrap(Ints.toByteArray(1))); writer.write(ByteBuffer.wrap(Ints.toByteArray(1)));
} catch (ISE e) { }
catch (ISE e) {
Assert.assertTrue(e.getMessage().contains("Liar!!!")); Assert.assertTrue(e.getMessage().contains("Liar!!!"));
exceptionThrown = true; exceptionThrown = true;
} }