CompressionUtils: Add support for decompressing xz, bz2, zip. (#5586)

Also switch various firehoses to the new method.

Fixes #5585.
This commit is contained in:
Gian Merlino 2018-04-06 08:06:45 -07:00 committed by Fangjin Yang
parent b86ed99d9a
commit 5ab17668c0
11 changed files with 136 additions and 41 deletions

View File

@ -91,7 +91,7 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi
@Override
protected InputStream wrapObjectStream(AzureBlob object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}
private static AzureByteSource makeByteSource(AzureStorage azureStorage, AzureBlob object)

View File

@ -101,7 +101,7 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho
@Override
protected InputStream wrapObjectStream(CloudFilesBlob object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}
@Override

View File

@ -93,7 +93,7 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF
@Override
protected InputStream wrapObjectStream(GoogleBlob object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}
@Override

View File

@ -134,28 +134,14 @@ public final class UriCacheGenerator implements CacheGenerator<UriExtractionName
catch (NumberFormatException ex) {
log.debug(ex, "Failed to get last modified timestamp. Assuming no timestamp");
}
final ByteSource source;
if (CompressionUtils.isGz(uriPath)) {
// Simple gzip stream
log.debug("Loading gz");
source = new ByteSource()
final ByteSource source = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
@Override
public InputStream openStream() throws IOException
{
return CompressionUtils.gzipInputStream(puller.getInputStream(uri));
}
};
} else {
source = new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return puller.getInputStream(uri);
}
};
}
return CompressionUtils.decompress(puller.getInputStream(uri), uri.getPath());
}
};
final CacheScheduler.VersionedCache versionedCache = scheduler.createVersionedCache(entryId, version);
try {

View File

@ -114,14 +114,14 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
// Getting data is deferred until openObjectStream() is called for each object.
if (!uris.isEmpty()) {
return uris.stream()
.map(
uri -> {
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key);
}
)
.collect(Collectors.toList());
.map(
uri -> {
final String s3Bucket = uri.getAuthority();
final String key = S3Utils.extractS3Key(uri);
return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key);
}
)
.collect(Collectors.toList());
} else {
final List<S3ObjectSummary> objects = new ArrayList<>();
for (URI uri : prefixes) {
@ -212,7 +212,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
@Override
protected InputStream wrapObjectStream(S3ObjectSummary object, InputStream stream) throws IOException
{
return object.getKey().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getKey());
}
@Override

View File

@ -81,6 +81,14 @@
<groupId>org.mozilla</groupId>
<artifactId>rhino</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>

View File

@ -28,14 +28,18 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import io.druid.java.util.common.io.NativeIO;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@ -48,7 +52,9 @@ public class CompressionUtils
{
private static final Logger log = new Logger(CompressionUtils.class);
private static final int DEFAULT_RETRY_COUNT = 3;
private static final String BZ2_SUFFIX = ".bz2";
private static final String GZ_SUFFIX = ".gz";
private static final String XZ_SUFFIX = ".xz";
private static final String ZIP_SUFFIX = ".zip";
/**
@ -313,7 +319,7 @@ public class CompressionUtils
*
* @return A GZIPInputStream that can handle concatenated gzip streams in the input
*/
public static GZIPInputStream gzipInputStream(final InputStream in) throws IOException
private static GZIPInputStream gzipInputStream(final InputStream in) throws IOException
{
return new GZIPInputStream(
new FilterInputStream(in)
@ -516,4 +522,42 @@ public class CompressionUtils
}
throw new IAE("[%s] is not a valid gz file name", fname);
}
/**
* Decompress an input stream from a file, based on the filename.
*/
public static InputStream decompress(final InputStream in, final String fileName) throws IOException
{
if (fileName.endsWith(GZ_SUFFIX)) {
return gzipInputStream(in);
} else if (fileName.endsWith(BZ2_SUFFIX)) {
return new BZip2CompressorInputStream(in, true);
} else if (fileName.endsWith(XZ_SUFFIX)) {
return new XZCompressorInputStream(in, true);
} else if (fileName.endsWith(ZIP_SUFFIX)) {
// This reads the first file in the archive.
final ZipInputStream zipIn = new ZipInputStream(in, StandardCharsets.UTF_8);
try {
final ZipEntry nextEntry = zipIn.getNextEntry();
if (nextEntry == null) {
zipIn.close();
// No files in the archive - return an empty stream.
return new ByteArrayInputStream(new byte[0]);
}
return zipIn;
}
catch (IOException e) {
try {
zipIn.close();
}
catch (IOException e2) {
e.addSuppressed(e2);
}
throw e;
}
} else {
return in;
}
}
}

View File

@ -25,6 +25,8 @@ import com.google.common.io.ByteSink;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -53,6 +55,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
public class CompressionUtilsTest
{
@ -221,7 +225,6 @@ public class CompressionUtilsTest
}
}
@Test
public void testGoodGzipByteSource() throws IOException
{
@ -230,7 +233,7 @@ public class CompressionUtilsTest
Assert.assertFalse(gzFile.exists());
CompressionUtils.gzip(Files.asByteSource(testFile), Files.asByteSink(gzFile), Predicates.<Throwable>alwaysTrue());
Assert.assertTrue(gzFile.exists());
try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) {
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), gzFile.getName())) {
assertGoodDataStream(inputStream);
}
if (!testFile.delete()) {
@ -244,6 +247,50 @@ public class CompressionUtilsTest
}
}
@Test
public void testDecompressBzip2() throws IOException
{
final File tmpDir = temporaryFolder.newFolder("testDecompressBzip2");
final File bzFile = new File(tmpDir, testFile.getName() + ".bz2");
Assert.assertFalse(bzFile.exists());
try (final OutputStream out = new BZip2CompressorOutputStream(new FileOutputStream(bzFile))) {
ByteStreams.copy(new FileInputStream(testFile), out);
}
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(bzFile), bzFile.getName())) {
assertGoodDataStream(inputStream);
}
}
@Test
public void testDecompressXz() throws IOException
{
final File tmpDir = temporaryFolder.newFolder("testDecompressXz");
final File xzFile = new File(tmpDir, testFile.getName() + ".xz");
Assert.assertFalse(xzFile.exists());
try (final OutputStream out = new XZCompressorOutputStream(new FileOutputStream(xzFile))) {
ByteStreams.copy(new FileInputStream(testFile), out);
}
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(xzFile), xzFile.getName())) {
assertGoodDataStream(inputStream);
}
}
@Test
public void testDecompressZip() throws IOException
{
final File tmpDir = temporaryFolder.newFolder("testDecompressZip");
final File zipFile = new File(tmpDir, testFile.getName() + ".zip");
Assert.assertFalse(zipFile.exists());
try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile))) {
out.putNextEntry(new ZipEntry("cool.file"));
ByteStreams.copy(new FileInputStream(testFile), out);
out.closeEntry();
}
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(zipFile), zipFile.getName())) {
assertGoodDataStream(inputStream);
}
}
@Test
public void testGoodGZStream() throws IOException
{
@ -490,7 +537,7 @@ public class CompressionUtilsTest
}, Predicates.<Throwable>alwaysTrue()
);
Assert.assertTrue(gzFile.exists());
try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) {
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), "file.gz")) {
assertGoodDataStream(inputStream);
}
if (!testFile.delete()) {
@ -536,7 +583,7 @@ public class CompressionUtilsTest
Assert.assertFalse(gzFile.exists());
CompressionUtils.gzip(Files.asByteSource(testFile), Files.asByteSink(gzFile), Predicates.<Throwable>alwaysTrue());
Assert.assertTrue(gzFile.exists());
try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) {
try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), "file.gz")) {
assertGoodDataStream(inputStream);
}
if (testFile.exists() && !testFile.delete()) {

10
pom.xml
View File

@ -325,6 +325,16 @@
<artifactId>rhino</artifactId>
<version>1.7R5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.16</version>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>

View File

@ -105,7 +105,7 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
@Override
protected InputStream wrapObjectStream(URI object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}
@Override

View File

@ -22,10 +22,10 @@ package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.data.input.impl.AbstractTextFilesFirehoseFactory;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
@ -97,6 +97,6 @@ public class LocalFirehoseFactory extends AbstractTextFilesFirehoseFactory<File>
@Override
protected InputStream wrapObjectStream(File object, InputStream stream) throws IOException
{
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
return CompressionUtils.decompress(stream, object.getPath());
}
}