Close output streams and channels loudly when creating segments.

This commit is contained in:
Gian Merlino 2015-08-28 17:14:03 -07:00
parent ceaa49ec4f
commit 7d6fa2ba50
2 changed files with 11 additions and 51 deletions

View File

@ -45,7 +45,6 @@ import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.guava.nary.BinaryFn;
@ -522,11 +521,8 @@ public class IndexMerger
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
File indexFile = new File(v8OutDir, "index.drd"); File indexFile = new File(v8OutDir, "index.drd");
FileOutputStream fileOutputStream = null; try (FileOutputStream fileOutputStream = new FileOutputStream(indexFile);
FileChannel channel = null; FileChannel channel = fileOutputStream.getChannel()) {
try {
fileOutputStream = new FileOutputStream(indexFile);
channel = fileOutputStream.getChannel();
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION})); channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel); GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
@ -544,12 +540,6 @@ public class IndexMerger
serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime)); serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime));
serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory())); serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory()));
} }
finally {
CloseQuietly.close(channel);
channel = null;
CloseQuietly.close(fileOutputStream);
fileOutputStream = null;
}
IndexIO.checkFileSize(indexFile); IndexIO.checkFileSize(indexFile);
log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
@ -928,7 +918,7 @@ public class IndexMerger
); );
if (segmentMetadata != null && !segmentMetadata.isEmpty()) { if (segmentMetadata != null && !segmentMetadata.isEmpty()) {
writeMetadataToFile( new File(v8OutDir, "metadata.drd"), segmentMetadata); writeMetadataToFile(new File(v8OutDir, "metadata.drd"), segmentMetadata);
log.info("wrote metadata.drd in outDir[%s].", v8OutDir); log.info("wrote metadata.drd in outDir[%s].", v8OutDir);
expectedFiles.add("metadata.drd"); expectedFiles.add("metadata.drd");
@ -994,9 +984,7 @@ public class IndexMerger
{ {
File indexFile = new File(inDir, "index.drd"); File indexFile = new File(inDir, "index.drd");
FileChannel channel = null; try (FileChannel channel = new FileOutputStream(indexFile).getChannel()) {
try {
channel = new FileOutputStream(indexFile).getChannel();
channel.write(ByteBuffer.wrap(new byte[]{versionId})); channel.write(ByteBuffer.wrap(new byte[]{versionId}));
availableDimensions.writeToChannel(channel); availableDimensions.writeToChannel(channel);
@ -1008,10 +996,6 @@ public class IndexMerger
channel, mapper.writeValueAsString(bitmapSerdeFactory) channel, mapper.writeValueAsString(bitmapSerdeFactory)
); );
} }
finally {
CloseQuietly.close(channel);
channel = null;
}
IndexIO.checkFileSize(indexFile); IndexIO.checkFileSize(indexFile);
} }
@ -1310,28 +1294,14 @@ public class IndexMerger
private static void writeMetadataToFile(File metadataFile, Map<String, Object> metadata) throws IOException private static void writeMetadataToFile(File metadataFile, Map<String, Object> metadata) throws IOException
{ {
FileOutputStream metadataFileOutputStream = null; try (FileOutputStream metadataFileOutputStream = new FileOutputStream(metadataFile);
FileChannel metadataFilechannel = null; FileChannel metadataFilechannel = metadataFileOutputStream.getChannel()
try { ) {
metadataFileOutputStream = new FileOutputStream(metadataFile);
metadataFilechannel = metadataFileOutputStream.getChannel();
byte[] metadataBytes = mapper.writeValueAsBytes(metadata); byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
if (metadataBytes.length != metadataFilechannel.write(ByteBuffer.wrap(metadataBytes))) { if (metadataBytes.length != metadataFilechannel.write(ByteBuffer.wrap(metadataBytes))) {
throw new IOException("Failed to write metadata for file"); throw new IOException("Failed to write metadata for file");
} }
} }
finally {
if (metadataFilechannel != null) {
metadataFilechannel.close();
metadataFilechannel = null;
}
if (metadataFileOutputStream != null) {
metadataFileOutputStream.close();
metadataFileOutputStream = null;
}
}
IndexIO.checkFileSize(metadataFile); IndexIO.checkFileSize(metadataFile);
} }
} }

View File

@ -22,7 +22,6 @@ import com.google.common.io.InputSupplier;
import com.google.common.io.OutputSupplier; import com.google.common.io.OutputSupplier;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly;
import io.druid.common.utils.SerializerUtils; import io.druid.common.utils.SerializerUtils;
import io.druid.segment.data.CompressedFloatsIndexedSupplier; import io.druid.segment.data.CompressedFloatsIndexedSupplier;
import io.druid.segment.data.CompressedFloatsSupplierSerializer; import io.druid.segment.data.CompressedFloatsSupplierSerializer;
@ -69,24 +68,15 @@ public class MetricHolder
OutputSupplier<? extends OutputStream> outSupplier, String name, String typeName, GenericIndexedWriter column OutputSupplier<? extends OutputStream> outSupplier, String name, String typeName, GenericIndexedWriter column
) throws IOException ) throws IOException
{ {
OutputStream out = null; try (OutputStream out = outSupplier.getOutput()) {
InputStream in = null;
try {
out = outSupplier.getOutput();
out.write(version); out.write(version);
serializerUtils.writeString(out, name); serializerUtils.writeString(out, name);
serializerUtils.writeString(out, typeName); serializerUtils.writeString(out, typeName);
final InputSupplier<InputStream> supplier = column.combineStreams(); final InputSupplier<InputStream> supplier = column.combineStreams();
in = supplier.getInput(); try (InputStream in = supplier.getInput()) {
ByteStreams.copy(in, out);
ByteStreams.copy(in, out); }
}
finally {
CloseQuietly.close(out);
CloseQuietly.close(in);
} }
} }