mirror of https://github.com/apache/druid.git
Merge pull request #1685 from gianm/close-loudly
Close output streams and channels loudly when creating segments.
This commit is contained in:
commit
04ff6cd355
|
@ -45,7 +45,6 @@ import com.metamx.collections.spatial.RTree;
|
||||||
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
|
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
|
||||||
import com.metamx.common.IAE;
|
import com.metamx.common.IAE;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.guava.CloseQuietly;
|
|
||||||
import com.metamx.common.guava.FunctionalIterable;
|
import com.metamx.common.guava.FunctionalIterable;
|
||||||
import com.metamx.common.guava.MergeIterable;
|
import com.metamx.common.guava.MergeIterable;
|
||||||
import com.metamx.common.guava.nary.BinaryFn;
|
import com.metamx.common.guava.nary.BinaryFn;
|
||||||
|
@ -522,11 +521,8 @@ public class IndexMerger
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
File indexFile = new File(v8OutDir, "index.drd");
|
File indexFile = new File(v8OutDir, "index.drd");
|
||||||
|
|
||||||
FileOutputStream fileOutputStream = null;
|
try (FileOutputStream fileOutputStream = new FileOutputStream(indexFile);
|
||||||
FileChannel channel = null;
|
FileChannel channel = fileOutputStream.getChannel()) {
|
||||||
try {
|
|
||||||
fileOutputStream = new FileOutputStream(indexFile);
|
|
||||||
channel = fileOutputStream.getChannel();
|
|
||||||
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
|
channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION}));
|
||||||
|
|
||||||
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
|
GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel);
|
||||||
|
@ -544,12 +540,6 @@ public class IndexMerger
|
||||||
serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime));
|
serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime));
|
||||||
serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory()));
|
serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory()));
|
||||||
}
|
}
|
||||||
finally {
|
|
||||||
CloseQuietly.close(channel);
|
|
||||||
channel = null;
|
|
||||||
CloseQuietly.close(fileOutputStream);
|
|
||||||
fileOutputStream = null;
|
|
||||||
}
|
|
||||||
IndexIO.checkFileSize(indexFile);
|
IndexIO.checkFileSize(indexFile);
|
||||||
log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
|
log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
|
||||||
|
|
||||||
|
@ -994,9 +984,7 @@ public class IndexMerger
|
||||||
{
|
{
|
||||||
File indexFile = new File(inDir, "index.drd");
|
File indexFile = new File(inDir, "index.drd");
|
||||||
|
|
||||||
FileChannel channel = null;
|
try (FileChannel channel = new FileOutputStream(indexFile).getChannel()) {
|
||||||
try {
|
|
||||||
channel = new FileOutputStream(indexFile).getChannel();
|
|
||||||
channel.write(ByteBuffer.wrap(new byte[]{versionId}));
|
channel.write(ByteBuffer.wrap(new byte[]{versionId}));
|
||||||
|
|
||||||
availableDimensions.writeToChannel(channel);
|
availableDimensions.writeToChannel(channel);
|
||||||
|
@ -1008,10 +996,6 @@ public class IndexMerger
|
||||||
channel, mapper.writeValueAsString(bitmapSerdeFactory)
|
channel, mapper.writeValueAsString(bitmapSerdeFactory)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
finally {
|
|
||||||
CloseQuietly.close(channel);
|
|
||||||
channel = null;
|
|
||||||
}
|
|
||||||
IndexIO.checkFileSize(indexFile);
|
IndexIO.checkFileSize(indexFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1310,28 +1294,14 @@ public class IndexMerger
|
||||||
|
|
||||||
private static void writeMetadataToFile(File metadataFile, Map<String, Object> metadata) throws IOException
|
private static void writeMetadataToFile(File metadataFile, Map<String, Object> metadata) throws IOException
|
||||||
{
|
{
|
||||||
FileOutputStream metadataFileOutputStream = null;
|
try (FileOutputStream metadataFileOutputStream = new FileOutputStream(metadataFile);
|
||||||
FileChannel metadataFilechannel = null;
|
FileChannel metadataFilechannel = metadataFileOutputStream.getChannel()
|
||||||
try {
|
) {
|
||||||
metadataFileOutputStream = new FileOutputStream(metadataFile);
|
|
||||||
metadataFilechannel = metadataFileOutputStream.getChannel();
|
|
||||||
|
|
||||||
byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
|
byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
|
||||||
if (metadataBytes.length != metadataFilechannel.write(ByteBuffer.wrap(metadataBytes))) {
|
if (metadataBytes.length != metadataFilechannel.write(ByteBuffer.wrap(metadataBytes))) {
|
||||||
throw new IOException("Failed to write metadata for file");
|
throw new IOException("Failed to write metadata for file");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finally {
|
|
||||||
if (metadataFilechannel != null) {
|
|
||||||
metadataFilechannel.close();
|
|
||||||
metadataFilechannel = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metadataFileOutputStream != null) {
|
|
||||||
metadataFileOutputStream.close();
|
|
||||||
metadataFileOutputStream = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
IndexIO.checkFileSize(metadataFile);
|
IndexIO.checkFileSize(metadataFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,6 @@ import com.google.common.io.InputSupplier;
|
||||||
import com.google.common.io.OutputSupplier;
|
import com.google.common.io.OutputSupplier;
|
||||||
import com.metamx.common.IAE;
|
import com.metamx.common.IAE;
|
||||||
import com.metamx.common.ISE;
|
import com.metamx.common.ISE;
|
||||||
import com.metamx.common.guava.CloseQuietly;
|
|
||||||
import io.druid.common.utils.SerializerUtils;
|
import io.druid.common.utils.SerializerUtils;
|
||||||
import io.druid.segment.data.CompressedFloatsIndexedSupplier;
|
import io.druid.segment.data.CompressedFloatsIndexedSupplier;
|
||||||
import io.druid.segment.data.CompressedFloatsSupplierSerializer;
|
import io.druid.segment.data.CompressedFloatsSupplierSerializer;
|
||||||
|
@ -69,24 +68,15 @@ public class MetricHolder
|
||||||
OutputSupplier<? extends OutputStream> outSupplier, String name, String typeName, GenericIndexedWriter column
|
OutputSupplier<? extends OutputStream> outSupplier, String name, String typeName, GenericIndexedWriter column
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
OutputStream out = null;
|
try (OutputStream out = outSupplier.getOutput()) {
|
||||||
InputStream in = null;
|
|
||||||
|
|
||||||
try {
|
|
||||||
out = outSupplier.getOutput();
|
|
||||||
|
|
||||||
out.write(version);
|
out.write(version);
|
||||||
serializerUtils.writeString(out, name);
|
serializerUtils.writeString(out, name);
|
||||||
serializerUtils.writeString(out, typeName);
|
serializerUtils.writeString(out, typeName);
|
||||||
|
|
||||||
final InputSupplier<InputStream> supplier = column.combineStreams();
|
final InputSupplier<InputStream> supplier = column.combineStreams();
|
||||||
in = supplier.getInput();
|
try (InputStream in = supplier.getInput()) {
|
||||||
|
|
||||||
ByteStreams.copy(in, out);
|
ByteStreams.copy(in, out);
|
||||||
}
|
}
|
||||||
finally {
|
|
||||||
CloseQuietly.close(out);
|
|
||||||
CloseQuietly.close(in);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue