Make segment creation gauva 14 friendly (#3520)

This commit is contained in:
Charles Allen 2016-10-05 15:25:03 -07:00 committed by Fangjin Yang
parent 1523de08fb
commit 76e77cb610
6 changed files with 48 additions and 26 deletions

View File

@ -84,12 +84,12 @@ public class MetricHolder
}
public static void writeFloatMetric(
ByteSink outSupplier, String name, FloatSupplierSerializer column
final ByteSink outSupplier, String name, FloatSupplierSerializer column
) throws IOException
{
outSupplier.write(version);
serializerUtils.writeString(outSupplier, name);
serializerUtils.writeString(outSupplier, "float");
serializerUtils.writeString(toOutputSupplier(outSupplier), name);
serializerUtils.writeString(toOutputSupplier(outSupplier), "float");
column.closeAndConsolidate(outSupplier);
}
@ -98,8 +98,8 @@ public class MetricHolder
) throws IOException
{
outSupplier.write(version);
serializerUtils.writeString(outSupplier, name);
serializerUtils.writeString(outSupplier, "long");
serializerUtils.writeString(toOutputSupplier(outSupplier), name);
serializerUtils.writeString(toOutputSupplier(outSupplier), "long");
column.closeAndConsolidate(outSupplier);
}
@ -164,6 +164,18 @@ public class MetricHolder
return holder;
}
// This is only for guava14 compat. Eventually it should be able to be removed.
private static OutputSupplier<? extends OutputStream> toOutputSupplier(final ByteSink sink) {
return new OutputSupplier<OutputStream>()
{
@Override
public OutputStream getOutput() throws IOException
{
return sink.openStream();
}
};
}
private final String name;
private final String typeName;
private final MetricType type;

View File

@ -54,6 +54,7 @@ import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.IntBuffer;
import java.nio.MappedByteBuffer;
import java.util.List;
@ -197,7 +198,7 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
}
@Override
public void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException
public void writeValueMetadataToFile(final FileOutputSupplier valueEncodingFile) throws IOException
{
final SerializerUtils serializerUtils = new SerializerUtils();
@ -218,15 +219,23 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
@Override
public void writeIndexesToFiles(
ByteSink invertedIndexFile,
OutputSupplier<FileOutputStream> spatialIndexFile
final ByteSink invertedIndexFile,
final OutputSupplier<FileOutputStream> spatialIndexFile
) throws IOException
{
final SerializerUtils serializerUtils = new SerializerUtils();
final OutputSupplier<OutputStream> invertedIndexOutputSupplier = new OutputSupplier<OutputStream>()
{
@Override
public OutputStream getOutput() throws IOException
{
return invertedIndexFile.openStream();
}
};
bitmapWriter.close();
serializerUtils.writeString(invertedIndexFile, dimensionName);
ByteStreams.copy(bitmapWriter.combineStreams(), invertedIndexFile);
serializerUtils.writeString(invertedIndexOutputSupplier, dimensionName);
ByteStreams.copy(bitmapWriter.combineStreams(), invertedIndexOutputSupplier);
if (capabilities.hasSpatialIndexes()) {

View File

@ -289,9 +289,9 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
// write dim values to one single file because we need to read it
File dimValueFile = IndexIO.makeDimFile(outDir, dimensionName);
FileOutputStream fos = new FileOutputStream(dimValueFile);
ByteStreams.copy(dictionaryWriter.combineStreams(), fos);
fos.close();
try(FileOutputStream fos = new FileOutputStream(dimValueFile)) {
ByteStreams.copy(dictionaryWriter.combineStreams(), fos);
}
final MappedByteBuffer dimValsMapped = Files.map(dimValueFile);
try (Closeable dimValsMappedUnmapper = new Closeable()

View File

@ -102,7 +102,7 @@ public class BlockLayoutFloatSupplierSerializer implements FloatSupplierSerializ
try (OutputStream out = consolidatedOut.openStream();
InputStream meta = ioPeon.makeInputStream(metaFile)) {
ByteStreams.copy(meta, out);
flattener.combineStreams().copyTo(out);
ByteStreams.copy(flattener.combineStreams(), out);
}
}
@ -135,7 +135,7 @@ public class BlockLayoutFloatSupplierSerializer implements FloatSupplierSerializ
public void writeToChannel(WritableByteChannel channel) throws IOException
{
try (InputStream meta = ioPeon.makeInputStream(metaFile);
InputStream input = flattener.combineStreams().openStream()) {
InputStream input = flattener.combineStreams().getInput()) {
ByteStreams.copy(Channels.newChannel(meta), channel);
final ReadableByteChannel from = Channels.newChannel(input);
ByteStreams.copy(from, channel);

View File

@ -112,7 +112,7 @@ public class BlockLayoutLongSupplierSerializer implements LongSupplierSerializer
try (OutputStream out = consolidatedOut.openStream();
InputStream meta = ioPeon.makeInputStream(metaFile)) {
ByteStreams.copy(meta, out);
flattener.combineStreams().copyTo(out);
ByteStreams.copy(flattener.combineStreams(), out);
}
}
@ -148,7 +148,7 @@ public class BlockLayoutLongSupplierSerializer implements LongSupplierSerializer
public void writeToChannel(WritableByteChannel channel) throws IOException
{
try (InputStream meta = ioPeon.makeInputStream(metaFile);
InputStream input = flattener.combineStreams().openStream()) {
InputStream input = flattener.combineStreams().getInput()) {
ByteStreams.copy(Channels.newChannel(meta), channel);
final ReadableByteChannel from = Channels.newChannel(input);
ByteStreams.copy(from, channel);

View File

@ -32,7 +32,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
@ -134,20 +133,22 @@ public class GenericIndexedWriter<T> implements Closeable
valuesOut.getCount(); // value length
}
public ByteSource combineStreams()
public InputSupplier<InputStream> combineStreams()
{
return ByteSource.concat(
// ByteSource.concat is only available in guava 15 and higher
// This is guava 14 compatible
return ByteStreams.join(
Iterables.transform(
Arrays.asList("meta", "header", "values"),
new Function<String,ByteSource>() {
new Function<String, InputSupplier<InputStream>>()
{
@Override
public ByteSource apply(final String input)
public InputSupplier<InputStream> apply(final String input)
{
return new ByteSource()
return new InputSupplier<InputStream>()
{
@Override
public InputStream openStream() throws IOException
public InputStream getInput() throws IOException
{
return ioPeon.makeInputStream(makeFilename(input));
}
@ -160,7 +161,7 @@ public class GenericIndexedWriter<T> implements Closeable
public void writeToChannel(WritableByteChannel channel) throws IOException
{
final ReadableByteChannel from = Channels.newChannel(combineStreams().openStream());
final ReadableByteChannel from = Channels.newChannel(combineStreams().getInput());
ByteStreams.copy(from, channel);
}
}