Merge pull request #1241 from infynyxx/hdfs_druid_remove_deprecated_code

use ByteSink and ByteSource instead of OutputSupplier and InputSupplier
This commit is contained in:
Xavier Léauté 2015-03-26 12:05:09 -07:00
commit 85174364ae
1 changed files with 8 additions and 9 deletions

View File

@ -19,8 +19,8 @@ package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.OutputSupplier;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.segment.SegmentUtils;
@ -75,7 +75,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
fs.mkdirs(outFile.getParent());
log.info("Compressing files from[%s] to [%s]", inDir, outFile);
long size;
final long size;
try (FSDataOutputStream out = fs.create(outFile)) {
size = CompressionUtils.zip(inDir, out);
}
@ -93,10 +93,9 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
{
final Path descriptorFile = new Path(outDir, "descriptor.json");
log.info("Creating descriptor file at[%s]", descriptorFile);
ByteStreams.copy(
ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)),
new HdfsOutputStreamSupplier(fs, descriptorFile)
);
ByteSource
.wrap(jsonMapper.writeValueAsBytes(segment))
.copyTo(new HdfsOutputStreamSupplier(fs, descriptorFile));
return segment;
}
@ -105,7 +104,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
return ImmutableMap.<String, Object>of("type", "hdfs", "path", outFile.toString());
}
private static class HdfsOutputStreamSupplier implements OutputSupplier<OutputStream>
private static class HdfsOutputStreamSupplier extends ByteSink
{
private final FileSystem fs;
private final Path descriptorFile;
@ -117,7 +116,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
}
@Override
public OutputStream getOutput() throws IOException
public OutputStream openStream() throws IOException
{
return fs.create(descriptorFile);
}