From 76e77cb61030fa78ef47e5e47f67751a73ffed1c Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Wed, 5 Oct 2016 15:25:03 -0700 Subject: [PATCH] Make segment creation gauva 14 friendly (#3520) --- .../java/io/druid/segment/MetricHolder.java | 22 ++++++++++++++----- .../segment/StringDimensionMergerLegacy.java | 19 +++++++++++----- .../segment/StringDimensionMergerV9.java | 6 ++--- .../BlockLayoutFloatSupplierSerializer.java | 4 ++-- .../BlockLayoutLongSupplierSerializer.java | 4 ++-- .../segment/data/GenericIndexedWriter.java | 19 ++++++++-------- 6 files changed, 48 insertions(+), 26 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/MetricHolder.java b/processing/src/main/java/io/druid/segment/MetricHolder.java index 524b3df6a9d..02c23759d6f 100644 --- a/processing/src/main/java/io/druid/segment/MetricHolder.java +++ b/processing/src/main/java/io/druid/segment/MetricHolder.java @@ -84,12 +84,12 @@ public class MetricHolder } public static void writeFloatMetric( - ByteSink outSupplier, String name, FloatSupplierSerializer column + final ByteSink outSupplier, String name, FloatSupplierSerializer column ) throws IOException { outSupplier.write(version); - serializerUtils.writeString(outSupplier, name); - serializerUtils.writeString(outSupplier, "float"); + serializerUtils.writeString(toOutputSupplier(outSupplier), name); + serializerUtils.writeString(toOutputSupplier(outSupplier), "float"); column.closeAndConsolidate(outSupplier); } @@ -98,8 +98,8 @@ public class MetricHolder ) throws IOException { outSupplier.write(version); - serializerUtils.writeString(outSupplier, name); - serializerUtils.writeString(outSupplier, "long"); + serializerUtils.writeString(toOutputSupplier(outSupplier), name); + serializerUtils.writeString(toOutputSupplier(outSupplier), "long"); column.closeAndConsolidate(outSupplier); } @@ -164,6 +164,18 @@ public class MetricHolder return holder; } + // This is only for guava14 compat. Eventually it should be able to be removed. + private static OutputSupplier toOutputSupplier(final ByteSink sink) { + return new OutputSupplier() + { + @Override + public OutputStream getOutput() throws IOException + { + return sink.openStream(); + } + }; + } + private final String name; private final String typeName; private final MetricType type; diff --git a/processing/src/main/java/io/druid/segment/StringDimensionMergerLegacy.java b/processing/src/main/java/io/druid/segment/StringDimensionMergerLegacy.java index b7221a8d321..488d5d9d4f1 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionMergerLegacy.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionMergerLegacy.java @@ -54,6 +54,7 @@ import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.IntBuffer; import java.nio.MappedByteBuffer; import java.util.List; @@ -197,7 +198,7 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme } @Override - public void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException + public void writeValueMetadataToFile(final FileOutputSupplier valueEncodingFile) throws IOException { final SerializerUtils serializerUtils = new SerializerUtils(); @@ -218,15 +219,23 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme @Override public void writeIndexesToFiles( - ByteSink invertedIndexFile, - OutputSupplier spatialIndexFile + final ByteSink invertedIndexFile, + final OutputSupplier spatialIndexFile ) throws IOException { final SerializerUtils serializerUtils = new SerializerUtils(); + final OutputSupplier invertedIndexOutputSupplier = new OutputSupplier() + { + @Override + public OutputStream getOutput() throws IOException + { + return invertedIndexFile.openStream(); + } + }; bitmapWriter.close(); - serializerUtils.writeString(invertedIndexFile, dimensionName); - ByteStreams.copy(bitmapWriter.combineStreams(), invertedIndexFile); + serializerUtils.writeString(invertedIndexOutputSupplier, dimensionName); + ByteStreams.copy(bitmapWriter.combineStreams(), invertedIndexOutputSupplier); if (capabilities.hasSpatialIndexes()) { diff --git a/processing/src/main/java/io/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/io/druid/segment/StringDimensionMergerV9.java index 79cf76a196a..8d9e5c11794 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionMergerV9.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionMergerV9.java @@ -289,9 +289,9 @@ public class StringDimensionMergerV9 implements DimensionMergerV9 // write dim values to one single file because we need to read it File dimValueFile = IndexIO.makeDimFile(outDir, dimensionName); - FileOutputStream fos = new FileOutputStream(dimValueFile); - ByteStreams.copy(dictionaryWriter.combineStreams(), fos); - fos.close(); + try(FileOutputStream fos = new FileOutputStream(dimValueFile)) { + ByteStreams.copy(dictionaryWriter.combineStreams(), fos); + } final MappedByteBuffer dimValsMapped = Files.map(dimValueFile); try (Closeable dimValsMappedUnmapper = new Closeable() diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutFloatSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutFloatSupplierSerializer.java index c8d47d15076..1737faf814b 100644 --- a/processing/src/main/java/io/druid/segment/data/BlockLayoutFloatSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutFloatSupplierSerializer.java @@ -102,7 +102,7 @@ public class BlockLayoutFloatSupplierSerializer implements FloatSupplierSerializ try (OutputStream out = consolidatedOut.openStream(); InputStream meta = ioPeon.makeInputStream(metaFile)) { ByteStreams.copy(meta, out); - flattener.combineStreams().copyTo(out); + ByteStreams.copy(flattener.combineStreams(), out); } } @@ -135,7 +135,7 @@ public class BlockLayoutFloatSupplierSerializer implements FloatSupplierSerializ public void writeToChannel(WritableByteChannel channel) throws IOException { try (InputStream meta = ioPeon.makeInputStream(metaFile); - InputStream input = flattener.combineStreams().openStream()) { + InputStream input = flattener.combineStreams().getInput()) { ByteStreams.copy(Channels.newChannel(meta), channel); final ReadableByteChannel from = Channels.newChannel(input); ByteStreams.copy(from, channel); diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutLongSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutLongSupplierSerializer.java index 6554ba50878..753b4c58cc1 100644 --- a/processing/src/main/java/io/druid/segment/data/BlockLayoutLongSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutLongSupplierSerializer.java @@ -112,7 +112,7 @@ public class BlockLayoutLongSupplierSerializer implements LongSupplierSerializer try (OutputStream out = consolidatedOut.openStream(); InputStream meta = ioPeon.makeInputStream(metaFile)) { ByteStreams.copy(meta, out); - flattener.combineStreams().copyTo(out); + ByteStreams.copy(flattener.combineStreams(), out); } } @@ -148,7 +148,7 @@ public class BlockLayoutLongSupplierSerializer implements LongSupplierSerializer public void writeToChannel(WritableByteChannel channel) throws IOException { try (InputStream meta = ioPeon.makeInputStream(metaFile); - InputStream input = flattener.combineStreams().openStream()) { + InputStream input = flattener.combineStreams().getInput()) { ByteStreams.copy(Channels.newChannel(meta), channel); final ReadableByteChannel from = Channels.newChannel(input); ByteStreams.copy(from, channel); diff --git a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java index 020fccd461f..c568e1f5a06 100644 --- a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java @@ -32,7 +32,6 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -134,20 +133,22 @@ public class GenericIndexedWriter implements Closeable valuesOut.getCount(); // value length } - public ByteSource combineStreams() + public InputSupplier combineStreams() { - return ByteSource.concat( + // ByteSource.concat is only available in guava 15 and higher + // This is guava 14 compatible + return ByteStreams.join( Iterables.transform( Arrays.asList("meta", "header", "values"), - new Function() { - + new Function>() + { @Override - public ByteSource apply(final String input) + public InputSupplier apply(final String input) { - return new ByteSource() + return new InputSupplier() { @Override - public InputStream openStream() throws IOException + public InputStream getInput() throws IOException { return ioPeon.makeInputStream(makeFilename(input)); } @@ -160,7 +161,7 @@ public class GenericIndexedWriter implements Closeable public void writeToChannel(WritableByteChannel channel) throws IOException { - final ReadableByteChannel from = Channels.newChannel(combineStreams().openStream()); + final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput()); ByteStreams.copy(from, channel); } }