diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java index e3ad8b432aa..345cffd512c 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java @@ -91,7 +91,7 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi @Override protected InputStream wrapObjectStream(AzureBlob object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } private static AzureByteSource makeByteSource(AzureStorage azureStorage, AzureBlob object) diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java index 5f39e7e5a44..343635c4680 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java @@ -101,7 +101,7 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho @Override protected InputStream wrapObjectStream(CloudFilesBlob object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } @Override diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java index 0d5d9995960..38fb8387088 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java @@ -93,7 +93,7 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF @Override protected InputStream wrapObjectStream(GoogleBlob object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } @Override diff --git a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/UriCacheGenerator.java b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/UriCacheGenerator.java index 27fd2eacbdf..c2ca336c2ec 100644 --- a/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/UriCacheGenerator.java +++ b/extensions-core/lookups-cached-global/src/main/java/io/druid/server/lookup/namespace/UriCacheGenerator.java @@ -134,28 +134,14 @@ public final class UriCacheGenerator implements CacheGenerator { - final String s3Bucket = uri.getAuthority(); - final String key = S3Utils.extractS3Key(uri); - return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key); - } - ) - .collect(Collectors.toList()); + .map( + uri -> { + final String s3Bucket = uri.getAuthority(); + final String key = S3Utils.extractS3Key(uri); + return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key); + } + ) + .collect(Collectors.toList()); } else { final List objects = new ArrayList<>(); for (URI uri : prefixes) { @@ -212,7 +212,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor @Override protected InputStream wrapObjectStream(S3ObjectSummary object, InputStream stream) throws IOException { - return object.getKey().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getKey()); } @Override diff --git a/java-util/pom.xml b/java-util/pom.xml index 150c332a3ca..7f0b462d9c5 100644 --- a/java-util/pom.xml +++ b/java-util/pom.xml @@ -81,6 +81,14 @@ org.mozilla rhino + + org.apache.commons + commons-compress + + + org.tukaani + xz + com.jayway.jsonpath json-path diff --git a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java index 876f26f2f58..c076ea6e8de 100644 --- a/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/CompressionUtils.java @@ -28,14 +28,18 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Files; import io.druid.java.util.common.io.NativeIO; import io.druid.java.util.common.logger.Logger; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.Enumeration; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -48,7 +52,9 @@ public class CompressionUtils { private static final Logger log = new Logger(CompressionUtils.class); private static final int DEFAULT_RETRY_COUNT = 3; + private static final String BZ2_SUFFIX = ".bz2"; private static final String GZ_SUFFIX = ".gz"; + private static final String XZ_SUFFIX = ".xz"; private static final String ZIP_SUFFIX = ".zip"; /** @@ -313,7 +319,7 @@ public class CompressionUtils * * @return A GZIPInputStream that can handle concatenated gzip streams in the input */ - public static GZIPInputStream gzipInputStream(final InputStream in) throws IOException + private static GZIPInputStream gzipInputStream(final InputStream in) throws IOException { return new GZIPInputStream( new FilterInputStream(in) @@ -516,4 +522,42 @@ public class CompressionUtils } throw new IAE("[%s] is not a valid gz file name", fname); } + + /** + * Decompress an input stream from a file, based on the filename. + */ + public static InputStream decompress(final InputStream in, final String fileName) throws IOException + { + if (fileName.endsWith(GZ_SUFFIX)) { + return gzipInputStream(in); + } else if (fileName.endsWith(BZ2_SUFFIX)) { + return new BZip2CompressorInputStream(in, true); + } else if (fileName.endsWith(XZ_SUFFIX)) { + return new XZCompressorInputStream(in, true); + } else if (fileName.endsWith(ZIP_SUFFIX)) { + // This reads the first file in the archive. + final ZipInputStream zipIn = new ZipInputStream(in, StandardCharsets.UTF_8); + try { + final ZipEntry nextEntry = zipIn.getNextEntry(); + if (nextEntry == null) { + zipIn.close(); + + // No files in the archive - return an empty stream. + return new ByteArrayInputStream(new byte[0]); + } + return zipIn; + } + catch (IOException e) { + try { + zipIn.close(); + } + catch (IOException e2) { + e.addSuppressed(e2); + } + throw e; + } + } else { + return in; + } + } } diff --git a/java-util/src/test/java/io/druid/java/util/common/CompressionUtilsTest.java b/java-util/src/test/java/io/druid/java/util/common/CompressionUtilsTest.java index c00e75b9224..d8f878b9ed7 100644 --- a/java-util/src/test/java/io/druid/java/util/common/CompressionUtilsTest.java +++ b/java-util/src/test/java/io/druid/java/util/common/CompressionUtilsTest.java @@ -25,6 +25,8 @@ import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -53,6 +55,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; public class CompressionUtilsTest { @@ -221,7 +225,6 @@ public class CompressionUtilsTest } } - @Test public void testGoodGzipByteSource() throws IOException { @@ -230,7 +233,7 @@ public class CompressionUtilsTest Assert.assertFalse(gzFile.exists()); CompressionUtils.gzip(Files.asByteSource(testFile), Files.asByteSink(gzFile), Predicates.alwaysTrue()); Assert.assertTrue(gzFile.exists()); - try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) { + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), gzFile.getName())) { assertGoodDataStream(inputStream); } if (!testFile.delete()) { @@ -244,6 +247,50 @@ public class CompressionUtilsTest } } + @Test + public void testDecompressBzip2() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testDecompressBzip2"); + final File bzFile = new File(tmpDir, testFile.getName() + ".bz2"); + Assert.assertFalse(bzFile.exists()); + try (final OutputStream out = new BZip2CompressorOutputStream(new FileOutputStream(bzFile))) { + ByteStreams.copy(new FileInputStream(testFile), out); + } + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(bzFile), bzFile.getName())) { + assertGoodDataStream(inputStream); + } + } + + @Test + public void testDecompressXz() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testDecompressXz"); + final File xzFile = new File(tmpDir, testFile.getName() + ".xz"); + Assert.assertFalse(xzFile.exists()); + try (final OutputStream out = new XZCompressorOutputStream(new FileOutputStream(xzFile))) { + ByteStreams.copy(new FileInputStream(testFile), out); + } + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(xzFile), xzFile.getName())) { + assertGoodDataStream(inputStream); + } + } + + @Test + public void testDecompressZip() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testDecompressZip"); + final File zipFile = new File(tmpDir, testFile.getName() + ".zip"); + Assert.assertFalse(zipFile.exists()); + try (final ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile))) { + out.putNextEntry(new ZipEntry("cool.file")); + ByteStreams.copy(new FileInputStream(testFile), out); + out.closeEntry(); + } + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(zipFile), zipFile.getName())) { + assertGoodDataStream(inputStream); + } + } + @Test public void testGoodGZStream() throws IOException { @@ -490,7 +537,7 @@ public class CompressionUtilsTest }, Predicates.alwaysTrue() ); Assert.assertTrue(gzFile.exists()); - try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) { + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), "file.gz")) { assertGoodDataStream(inputStream); } if (!testFile.delete()) { @@ -536,7 +583,7 @@ public class CompressionUtilsTest Assert.assertFalse(gzFile.exists()); CompressionUtils.gzip(Files.asByteSource(testFile), Files.asByteSink(gzFile), Predicates.alwaysTrue()); Assert.assertTrue(gzFile.exists()); - try (final InputStream inputStream = CompressionUtils.gzipInputStream(new FileInputStream(gzFile))) { + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(gzFile), "file.gz")) { assertGoodDataStream(inputStream); } if (testFile.exists() && !testFile.delete()) { diff --git a/pom.xml b/pom.xml index dc401857aa2..8fc12df9adc 100644 --- a/pom.xml +++ b/pom.xml @@ -325,6 +325,16 @@ rhino 1.7R5 + + org.apache.commons + commons-compress + 1.16 + + + org.tukaani + xz + 1.8 + com.fasterxml.jackson.core jackson-annotations diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java index aaab6f9dae5..949cb1db7e4 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -105,7 +105,7 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory @Override protected InputStream wrapObjectStream(File object, InputStream stream) throws IOException { - return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; + return CompressionUtils.decompress(stream, object.getPath()); } }