From 73d9b316642e2e9fa43ae765ee30bbf4ff277877 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Mon, 27 Mar 2017 13:17:31 -0600 Subject: [PATCH] GenericIndexed minor bug fixes, optimizations and refactoring (#3951) * Minor bug fixes in GenericIndexed; Refactor and optimize GenericIndexed; Remove some unnecessary ByteBuffer duplications in some deserialization paths; Add ZeroCopyByteArrayOutputStream * Fixes * Move GenericIndexedWriter.writeLongValueToOutputStream() and writeIntValueToOutputStream() to SerializerUtils * Move constructors * Add GenericIndexedBenchmark * Comments * Typo * Note in Javadoc that IntermediateLongSupplierSerializer, LongColumnSerializer and LongMetricColumnSerializer are thread-unsafe * Use primitive collections in IntermediateLongSupplierSerializer instead of BiMap * Optimize TableLongEncodingWriter * Add checks to SerializerUtils methods * Don't restrict byte order in SerializerUtils.writeLongToOutputStream() and writeIntToOutputStream() * Update GenericIndexedBenchmark * SerializerUtils.writeIntToOutputStream() and writeLongToOutputStream() separate for big-endian and native-endian * Add GenericIndexedBenchmark.indexOf() * More checks in methods in SerializerUtils * Use helperBuffer.arrayOffset() * Optimizations in SerializerUtils --- .../benchmark/GenericIndexedBenchmark.java | 174 +++++ .../bitmap/WrappedImmutableConciseBitmap.java | 2 +- .../bitmap/WrappedImmutableRoaringBitmap.java | 2 +- .../collections/spatial/ImmutableRTree.java | 3 +- .../druid/common/utils/SerializerUtils.java | 120 ++- .../io/ZeroCopyByteArrayOutputStream.java | 47 ++ .../histogram/ApproximateHistogram.java | 9 +- .../ApproximateHistogramFoldingSerde.java | 5 +- .../aggregation/variance/VarianceSerde.java | 5 +- .../common/io/smoosh/SmooshedFileMapper.java | 4 + .../hyperloglog/HyperUniquesSerde.java | 2 + .../main/java/io/druid/segment/IndexIO.java | 51 +- .../java/io/druid/segment/IndexMergerV9.java | 20 +- .../druid/segment/LongColumnSerializer.java | 3 + .../segment/LongMetricColumnSerializer.java | 1 + .../druid/segment/data/ByteBufferWriter.java | 6 +- .../data/ConciseBitmapSerdeFactory.java | 5 +- .../io/druid/segment/data/GenericIndexed.java | 739 ++++++++++-------- .../segment/data/GenericIndexedWriter.java | 34 +- .../io/druid/segment/data/IndexedRTree.java | 5 +- .../segment/data/IntBufferIndexedInts.java | 138 ---- .../IntermediateLongSupplierSerializer.java | 21 +- .../io/druid/segment/data/ObjectStrategy.java | 7 +- .../data/RoaringBitmapSerdeFactory.java | 5 +- .../segment/data/TableLongEncodingWriter.java | 16 +- .../io/druid/segment/data/VSizeIndexed.java | 17 +- .../druid/segment/data/VSizeIndexedInts.java | 7 +- .../segment/data/VSizeIndexedIntsWriter.java | 5 +- .../segment/data/VSizeIndexedWriter.java | 2 +- .../druid/segment/data/IndexedIntsTest.java | 3 +- .../segment/data/VSizeIndexedIntsTest.java | 2 +- 31 files changed, 820 insertions(+), 640 deletions(-) create mode 100644 benchmarks/src/main/java/io/druid/benchmark/GenericIndexedBenchmark.java create mode 100644 common/src/main/java/io/druid/io/ZeroCopyByteArrayOutputStream.java delete mode 100644 processing/src/main/java/io/druid/segment/data/IntBufferIndexedInts.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/GenericIndexedBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GenericIndexedBenchmark.java new file mode 100644 index 00000000000..4b5fb28574d --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/GenericIndexedBenchmark.java @@ -0,0 +1,174 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark; + +import com.google.common.io.Files; +import com.google.common.primitives.Ints; +import io.druid.java.util.common.io.smoosh.FileSmoosher; +import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.GenericIndexedWriter; +import io.druid.segment.data.ObjectStrategy; +import io.druid.segment.data.TmpFileIOPeon; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@OperationsPerInvocation(GenericIndexedBenchmark.ITERATIONS) +@Warmup(iterations = 5) +@Measurement(iterations = 20) +@Fork(1) +@State(Scope.Benchmark) +public class GenericIndexedBenchmark +{ + public static final int ITERATIONS = 10000; + + static final ObjectStrategy byteArrayStrategy = new ObjectStrategy() + { + @Override + public Class getClazz() + { + return byte[].class; + } + + @Override + public byte[] fromByteBuffer(ByteBuffer buffer, int numBytes) + { + byte[] result = new byte[numBytes]; + buffer.get(result); + return result; + } + + @Override + public byte[] toBytes(byte[] val) + { + return val; + } + + @Override + public int compare(byte[] o1, byte[] o2) + { + return Integer.compare(Ints.fromByteArray(o1), Ints.fromByteArray(o2)); + } + }; + + @Param({"10000"}) + public int n; + @Param({"8"}) + public int elementSize; + + private File file; + private File smooshDir; + private GenericIndexed genericIndexed; + private int[] iterationIndexes; + private byte[][] elementsToSearch; + + @Setup(Level.Trial) + public void createGenericIndexed() throws IOException + { + GenericIndexedWriter genericIndexedWriter = new GenericIndexedWriter<>( + new TmpFileIOPeon(), + "genericIndexedBenchmark", + byteArrayStrategy + ); + genericIndexedWriter.open(); + + // GenericIndexObject caches prevObject for comparison, so need two arrays for correct objectsSorted computation. + ByteBuffer[] elements = new ByteBuffer[2]; + elements[0] = ByteBuffer.allocate(elementSize); + elements[1] = ByteBuffer.allocate(elementSize); + for (int i = 0; i < n; i++) { + ByteBuffer element = elements[i & 1]; + element.putInt(0, i); + genericIndexedWriter.write(element.array()); + } + genericIndexedWriter.close(); + smooshDir = Files.createTempDir(); + file = File.createTempFile("genericIndexedBenchmark", "meta"); + + try (FileChannel fileChannel = + FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); + FileSmoosher fileSmoosher = new FileSmoosher(smooshDir)) { + genericIndexedWriter.writeToChannel(fileChannel, fileSmoosher); + } + + FileChannel fileChannel = FileChannel.open(file.toPath()); + MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, file.length()); + genericIndexed = GenericIndexed.read(byteBuffer, byteArrayStrategy, SmooshedFileMapper.load(smooshDir)); + } + + @Setup(Level.Trial) + public void createIterationIndexes() + { + iterationIndexes = new int[ITERATIONS]; + for (int i = 0; i < ITERATIONS; i++) { + iterationIndexes[i] = ThreadLocalRandom.current().nextInt(n); + } + } + + @Setup(Level.Trial) + public void createElementsToSearch() + { + elementsToSearch = new byte[ITERATIONS][]; + for (int i = 0; i < ITERATIONS; i++) { + elementsToSearch[i] = Ints.toByteArray(ThreadLocalRandom.current().nextInt(n)); + } + } + + @Benchmark + public void get(Blackhole bh) + { + for (int i : iterationIndexes) { + bh.consume(genericIndexed.get(i)); + } + } + + @Benchmark + public int indexOf() + { + int r = 0; + for (byte[] elementToSearch : elementsToSearch) { + r ^= genericIndexed.indexOf(elementToSearch); + } + return r; + } +} diff --git a/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableConciseBitmap.java b/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableConciseBitmap.java index 137b28d7ef2..c860b38ff7d 100755 --- a/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableConciseBitmap.java +++ b/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableConciseBitmap.java @@ -34,7 +34,7 @@ public class WrappedImmutableConciseBitmap implements ImmutableBitmap public WrappedImmutableConciseBitmap(ByteBuffer byteBuffer) { - this.bitmap = new ImmutableConciseSet(byteBuffer.asReadOnlyBuffer()); + this.bitmap = new ImmutableConciseSet(byteBuffer); } /** diff --git a/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java b/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java index 9b2210ab115..26454d07cc2 100755 --- a/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java +++ b/bytebuffer-collections/src/main/java/io/druid/collections/bitmap/WrappedImmutableRoaringBitmap.java @@ -36,7 +36,7 @@ public class WrappedImmutableRoaringBitmap implements ImmutableBitmap protected WrappedImmutableRoaringBitmap(ByteBuffer byteBuffer) { - this.bitmap = new ImmutableRoaringBitmap(byteBuffer.asReadOnlyBuffer()); + this.bitmap = new ImmutableRoaringBitmap(byteBuffer); } /** diff --git a/bytebuffer-collections/src/main/java/io/druid/collections/spatial/ImmutableRTree.java b/bytebuffer-collections/src/main/java/io/druid/collections/spatial/ImmutableRTree.java index 9da5957170b..9c54b69f7ca 100755 --- a/bytebuffer-collections/src/main/java/io/druid/collections/spatial/ImmutableRTree.java +++ b/bytebuffer-collections/src/main/java/io/druid/collections/spatial/ImmutableRTree.java @@ -50,6 +50,7 @@ public class ImmutableRTree public ImmutableRTree(ByteBuffer data, BitmapFactory bitmapFactory) { + data = data.asReadOnlyBuffer(); final int initPosition = data.position(); Preconditions.checkArgument(data.get(0) == VERSION, "Mismatching versions"); this.numDims = data.getInt(1 + initPosition) & 0x7FFF; @@ -69,7 +70,7 @@ public class ImmutableRTree buffer.putInt(rTree.getNumDims()); rTree.getRoot().storeInByteBuffer(buffer, buffer.position()); buffer.position(0); - return new ImmutableRTree(buffer.asReadOnlyBuffer(), rTree.getBitmapFactory()); + return new ImmutableRTree(buffer, rTree.getBitmapFactory()); } private static int calcNumBytes(RTree tree) diff --git a/common/src/main/java/io/druid/common/utils/SerializerUtils.java b/common/src/main/java/io/druid/common/utils/SerializerUtils.java index d0ea1709974..e857ff4344b 100644 --- a/common/src/main/java/io/druid/common/utils/SerializerUtils.java +++ b/common/src/main/java/io/druid/common/utils/SerializerUtils.java @@ -21,14 +21,16 @@ package io.druid.common.utils; import com.google.common.io.ByteStreams; import com.google.common.io.OutputSupplier; +import com.google.common.primitives.Floats; import com.google.common.primitives.Ints; - +import com.google.common.primitives.Longs; import io.druid.collections.IntList; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; import java.util.Arrays; @@ -38,6 +40,70 @@ public class SerializerUtils { private static final Charset UTF8 = Charset.forName("UTF-8"); + /** + * Writes the given long value into the given OutputStream in big-endian byte order, using the helperBuffer. Faster + * alternative to out.write(Longs.toByteArray(value)), more convenient (sometimes) than wrapping the OutputStream into + * {@link java.io.DataOutputStream}. + * + * @param helperBuffer a big-endian heap ByteBuffer with capacity of at least 8 + */ + public static void writeBigEndianLongToOutputStream(OutputStream out, long value, ByteBuffer helperBuffer) + throws IOException + { + if (helperBuffer.order() != ByteOrder.BIG_ENDIAN || !helperBuffer.hasArray()) { + throw new IllegalArgumentException("Expected writable, big-endian, heap byteBuffer"); + } + helperBuffer.putLong(0, value); + out.write(helperBuffer.array(), helperBuffer.arrayOffset(), Longs.BYTES); + } + + /** + * Writes the given long value into the given OutputStream in the native byte order, using the helperBuffer. + * + * @param helperBuffer a heap ByteBuffer with capacity of at least 8, with the native byte order + */ + public static void writeNativeOrderedLongToOutputStream(OutputStream out, long value, ByteBuffer helperBuffer) + throws IOException + { + if (helperBuffer.order() != ByteOrder.nativeOrder() || !helperBuffer.hasArray()) { + throw new IllegalArgumentException("Expected writable heap byteBuffer with the native byte order"); + } + helperBuffer.putLong(0, value); + out.write(helperBuffer.array(), helperBuffer.arrayOffset(), Longs.BYTES); + } + + /** + * Writes the given int value into the given OutputStream in big-endian byte order, using the helperBuffer. Faster + * alternative to out.write(Ints.toByteArray(value)), more convenient (sometimes) than wrapping the OutputStream into + * {@link java.io.DataOutputStream}. + * + * @param helperBuffer a big-endian heap ByteBuffer with capacity of at least 4 + */ + public static void writeBigEndianIntToOutputStream(OutputStream out, int value, ByteBuffer helperBuffer) + throws IOException + { + if (helperBuffer.order() != ByteOrder.BIG_ENDIAN || !helperBuffer.hasArray()) { + throw new IllegalArgumentException("Expected writable, big-endian, heap byteBuffer"); + } + helperBuffer.putInt(0, value); + out.write(helperBuffer.array(), helperBuffer.arrayOffset(), Ints.BYTES); + } + + /** + * Writes the given int value into the given OutputStream in the native byte order, using the given helperBuffer. + * + * @param helperBuffer a heap ByteBuffer with capacity of at least 4, with the native byte order + */ + public static void writeNativeOrderedIntToOutputStream(OutputStream out, int value, ByteBuffer helperBuffer) + throws IOException + { + if (helperBuffer.order() != ByteOrder.nativeOrder() || !helperBuffer.hasArray()) { + throw new IllegalArgumentException("Expected writable heap byteBuffer with the native byte order"); + } + helperBuffer.putInt(0, value); + out.write(helperBuffer.array(), helperBuffer.arrayOffset(), Ints.BYTES); + } + public void writeString(T out, String name) throws IOException { byte[] nameBytes = name.getBytes(UTF8); @@ -122,16 +188,12 @@ public class SerializerUtils public void writeInt(OutputStream out, int intValue) throws IOException { - byte[] outBytes = new byte[4]; - - ByteBuffer.wrap(outBytes).putInt(intValue); - - out.write(outBytes); + out.write(Ints.toByteArray(intValue)); } public void writeInt(WritableByteChannel out, int intValue) throws IOException { - final ByteBuffer buffer = ByteBuffer.allocate(4); + final ByteBuffer buffer = ByteBuffer.allocate(Ints.BYTES); buffer.putInt(intValue); buffer.flip(); out.write(buffer); @@ -139,19 +201,19 @@ public class SerializerUtils public int readInt(InputStream in) throws IOException { - byte[] intBytes = new byte[4]; + byte[] intBytes = new byte[Ints.BYTES]; ByteStreams.readFully(in, intBytes); - return ByteBuffer.wrap(intBytes).getInt(); + return Ints.fromByteArray(intBytes); } public void writeInts(OutputStream out, int[] ints) throws IOException { writeInt(out, ints.length); - for (int i = 0; i < ints.length; i++) { - writeInt(out, ints[i]); + for (int value : ints) { + writeInt(out, value); } } @@ -178,16 +240,12 @@ public class SerializerUtils public void writeLong(OutputStream out, long longValue) throws IOException { - byte[] outBytes = new byte[8]; - - ByteBuffer.wrap(outBytes).putLong(longValue); - - out.write(outBytes); + out.write(Longs.toByteArray(longValue)); } public void writeLong(WritableByteChannel out, long longValue) throws IOException { - final ByteBuffer buffer = ByteBuffer.allocate(8); + final ByteBuffer buffer = ByteBuffer.allocate(Longs.BYTES); buffer.putLong(longValue); buffer.flip(); out.write(buffer); @@ -195,19 +253,19 @@ public class SerializerUtils public long readLong(InputStream in) throws IOException { - byte[] longBytes = new byte[8]; + byte[] longBytes = new byte[Longs.BYTES]; ByteStreams.readFully(in, longBytes); - return ByteBuffer.wrap(longBytes).getLong(); + return Longs.fromByteArray(longBytes); } public void writeLongs(OutputStream out, long[] longs) throws IOException { writeInt(out, longs.length); - for (int i = 0; i < longs.length; i++) { - writeLong(out, longs[i]); + for (long value : longs) { + writeLong(out, value); } } @@ -223,18 +281,14 @@ public class SerializerUtils return retVal; } - public void writeFloat(OutputStream out, float intValue) throws IOException + public void writeFloat(OutputStream out, float floatValue) throws IOException { - byte[] outBytes = new byte[4]; - - ByteBuffer.wrap(outBytes).putFloat(intValue); - - out.write(outBytes); + writeInt(out, Float.floatToRawIntBits(floatValue)); } public void writeFloat(WritableByteChannel out, float floatValue) throws IOException { - final ByteBuffer buffer = ByteBuffer.allocate(4); + final ByteBuffer buffer = ByteBuffer.allocate(Floats.BYTES); buffer.putFloat(floatValue); buffer.flip(); out.write(buffer); @@ -242,19 +296,15 @@ public class SerializerUtils public float readFloat(InputStream in) throws IOException { - byte[] floatBytes = new byte[4]; - - ByteStreams.readFully(in, floatBytes); - - return ByteBuffer.wrap(floatBytes).getFloat(); + return Float.intBitsToFloat(readInt(in)); } public void writeFloats(OutputStream out, float[] floats) throws IOException { writeInt(out, floats.length); - for (int i = 0; i < floats.length; i++) { - writeFloat(out, floats[i]); + for (float value : floats) { + writeFloat(out, value); } } diff --git a/common/src/main/java/io/druid/io/ZeroCopyByteArrayOutputStream.java b/common/src/main/java/io/druid/io/ZeroCopyByteArrayOutputStream.java new file mode 100644 index 00000000000..fe1f9ec132c --- /dev/null +++ b/common/src/main/java/io/druid/io/ZeroCopyByteArrayOutputStream.java @@ -0,0 +1,47 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.io; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +public class ZeroCopyByteArrayOutputStream extends ByteArrayOutputStream +{ + public ZeroCopyByteArrayOutputStream() + { + } + + public ZeroCopyByteArrayOutputStream(int capacity) + { + super(capacity); + } + + public void writeTo(ByteBuffer outputBuffer) + { + outputBuffer.put(buf, 0, count); + } + + public void writeTo(WritableByteChannel channel) throws IOException + { + channel.write(ByteBuffer.wrap(buf, 0, count)); + } +} diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java index 6d7dd4fd93e..86e1a8f0c8d 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogram.java @@ -1430,16 +1430,13 @@ public class ApproximateHistogram */ public static ApproximateHistogram fromBytes(ByteBuffer buf) { - ByteBuffer copy = buf.asReadOnlyBuffer(); // negative size indicates compact representation // this works regardless of whether we use int or short for the size since the leftmost bit is the sign bit - if (copy.getShort(buf.position()) < 0) { + if (buf.getShort(buf.position()) < 0) { return fromBytesCompact(buf); } else { - // ignore size - copy.getInt(); - // determine if sparse or dense based on sign of binCount - if (copy.getInt() < 0) { + // ignore size, determine if sparse or dense based on sign of binCount + if (buf.getInt(buf.position() + Ints.BYTES) < 0) { return fromBytesSparse(buf); } else { return fromBytesDense(buf); diff --git a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingSerde.java b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingSerde.java index 573bf191391..9dc37b61b2a 100644 --- a/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingSerde.java +++ b/extensions-core/histogram/src/main/java/io/druid/query/aggregation/histogram/ApproximateHistogramFoldingSerde.java @@ -120,9 +120,8 @@ public class ApproximateHistogramFoldingSerde extends ComplexMetricSerde @Override public ApproximateHistogram fromByteBuffer(ByteBuffer buffer, int numBytes) { - final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); - readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); - return ApproximateHistogram.fromBytes(readOnlyBuffer); + buffer.limit(buffer.position() + numBytes); + return ApproximateHistogram.fromBytes(buffer); } @Override diff --git a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java index c42667cc0e2..3bcb988a200 100644 --- a/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java +++ b/extensions-core/stats/src/main/java/io/druid/query/aggregation/variance/VarianceSerde.java @@ -103,9 +103,8 @@ public class VarianceSerde extends ComplexMetricSerde @Override public VarianceAggregatorCollector fromByteBuffer(ByteBuffer buffer, int numBytes) { - final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); - readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); - return VarianceAggregatorCollector.from(readOnlyBuffer); + buffer.limit(buffer.position() + numBytes); + return VarianceAggregatorCollector.from(buffer); } @Override diff --git a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapper.java b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapper.java index ba6da3b5fdc..1aee6aa1742 100644 --- a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapper.java +++ b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapper.java @@ -111,6 +111,10 @@ public class SmooshedFileMapper implements Closeable return internalFiles.keySet(); } + /** + * Returns a mapped buffer of the smooshed file with the given name. Buffer's contents from 0 to capacity() are the + * whole mapped file contents, limit() is equal to capacity(). + */ public ByteBuffer mapFile(String name) throws IOException { final Metadata metadata = internalFiles.get(name); diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesSerde.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesSerde.java index c494954143f..0ab39581c1a 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesSerde.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniquesSerde.java @@ -120,6 +120,8 @@ public class HyperUniquesSerde extends ComplexMetricSerde @Override public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes) { + // make a copy of buffer, because the given buffer is not duplicated in HyperLogLogCollector.makeCollector() and + // stored in a field. final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); return HyperLogLogCollector.makeCollector(readOnlyBuffer); diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index d2dedc405e8..880b2e16c09 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -43,6 +43,7 @@ import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.collections.bitmap.MutableBitmap; import io.druid.collections.spatial.ImmutableRTree; import io.druid.common.utils.SerializerUtils; +import io.druid.io.ZeroCopyByteArrayOutputStream; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.io.smoosh.FileSmoosher; @@ -689,17 +690,7 @@ public class IndexIO final ColumnDescriptor serdeficator = builder .addSerde(columnPartBuilder.build()) .build(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); - - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - dimension, serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel, v9Smoosher); - channel.close(); + makeColumn(v9Smoosher, dimension, serdeficator); } else if (filename.startsWith("met_") || filename.startsWith("numeric_dim_")) { // NOTE: identifying numeric dimensions by using a different filename pattern is meant to allow the // legacy merger (which will be deprecated) to support long/float dims. Going forward, the V9 merger @@ -751,17 +742,7 @@ public class IndexIO } final ColumnDescriptor serdeficator = builder.build(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); - - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - metric, serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel, v9Smoosher); - channel.close(); + makeColumn(v9Smoosher, metric, serdeficator); } else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) { CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer( v8SmooshedFiles.mapFile(filename), @@ -778,17 +759,7 @@ public class IndexIO .build() ); final ColumnDescriptor serdeficator = builder.build(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); - - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - "__time", serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel, v9Smoosher); - channel.close(); + makeColumn(v9Smoosher, "__time", serdeficator); } else { skippedFiles.add(filename); } @@ -854,6 +825,20 @@ public class IndexIO closer.close(); } } + + private void makeColumn(FileSmoosher v9Smoosher, String dimension, ColumnDescriptor serdeficator) + throws IOException + { + ZeroCopyByteArrayOutputStream specBytes = new ZeroCopyByteArrayOutputStream(); + serializerUtils.writeString(specBytes, mapper.writeValueAsString(serdeficator)); + + try (SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + dimension, serdeficator.numBytes() + specBytes.size() + )) { + specBytes.writeTo(channel); + serdeficator.write(channel, v9Smoosher); + } + } } static interface IndexLoader diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index 62347cd68f1..8e9f35a3656 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -31,6 +31,7 @@ import com.google.common.io.Files; import com.google.common.primitives.Ints; import com.google.inject.Inject; import io.druid.common.utils.JodaUtils; +import io.druid.io.ZeroCopyByteArrayOutputStream; import io.druid.java.util.common.ISE; import io.druid.java.util.common.io.smoosh.FileSmoosher; import io.druid.java.util.common.io.smoosh.SmooshedWriter; @@ -56,7 +57,6 @@ import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -402,20 +402,14 @@ public class IndexMergerV9 extends IndexMerger final ColumnDescriptor serdeficator ) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); - - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - columnName, serdeficator.numBytes() + specBytes.length - ); - try { - channel.write(ByteBuffer.wrap(specBytes)); + ZeroCopyByteArrayOutputStream specBytes = new ZeroCopyByteArrayOutputStream(); + serializerUtils.writeString(specBytes, mapper.writeValueAsString(serdeficator)); + try (SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + columnName, serdeficator.numBytes() + specBytes.size() + )) { + specBytes.writeTo(channel); serdeficator.write(channel, v9Smoosher); } - finally { - channel.close(); - } } private void mergeIndexesAndWriteColumns( diff --git a/processing/src/main/java/io/druid/segment/LongColumnSerializer.java b/processing/src/main/java/io/druid/segment/LongColumnSerializer.java index 2c164f59d93..8583ae4ef38 100644 --- a/processing/src/main/java/io/druid/segment/LongColumnSerializer.java +++ b/processing/src/main/java/io/druid/segment/LongColumnSerializer.java @@ -29,6 +29,9 @@ import java.io.IOException; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; +/** + * Unsafe for concurrent use from multiple threads. + */ public class LongColumnSerializer implements GenericColumnSerializer { public static LongColumnSerializer create( diff --git a/processing/src/main/java/io/druid/segment/LongMetricColumnSerializer.java b/processing/src/main/java/io/druid/segment/LongMetricColumnSerializer.java index 12edc7f28e5..263f740dcca 100644 --- a/processing/src/main/java/io/druid/segment/LongMetricColumnSerializer.java +++ b/processing/src/main/java/io/druid/segment/LongMetricColumnSerializer.java @@ -30,6 +30,7 @@ import java.io.File; import java.io.IOException; /** + * Unsafe for concurrent use from multiple threads. */ public class LongMetricColumnSerializer implements MetricColumnSerializer { diff --git a/processing/src/main/java/io/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/io/druid/segment/data/ByteBufferWriter.java index afe13d0a49c..82baa089baa 100644 --- a/processing/src/main/java/io/druid/segment/data/ByteBufferWriter.java +++ b/processing/src/main/java/io/druid/segment/data/ByteBufferWriter.java @@ -26,11 +26,13 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; +import io.druid.common.utils.SerializerUtils; import io.druid.java.util.common.io.smoosh.FileSmoosher; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -46,6 +48,7 @@ public class ByteBufferWriter implements Closeable private CountingOutputStream headerOut = null; private CountingOutputStream valueOut = null; + private final ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES); public ByteBufferWriter( IOPeon ioPeon, @@ -67,8 +70,7 @@ public class ByteBufferWriter implements Closeable public void write(T objectToWrite) throws IOException { byte[] bytesToWrite = strategy.toBytes(objectToWrite); - - headerOut.write(Ints.toByteArray(bytesToWrite.length)); + SerializerUtils.writeBigEndianIntToOutputStream(headerOut, bytesToWrite.length, helperBuffer); valueOut.write(bytesToWrite); } diff --git a/processing/src/main/java/io/druid/segment/data/ConciseBitmapSerdeFactory.java b/processing/src/main/java/io/druid/segment/data/ConciseBitmapSerdeFactory.java index 7cdcf6937e3..de06317428f 100644 --- a/processing/src/main/java/io/druid/segment/data/ConciseBitmapSerdeFactory.java +++ b/processing/src/main/java/io/druid/segment/data/ConciseBitmapSerdeFactory.java @@ -80,9 +80,8 @@ public class ConciseBitmapSerdeFactory implements BitmapSerdeFactory @Override public WrappedImmutableConciseBitmap fromByteBuffer(ByteBuffer buffer, int numBytes) { - final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); - readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); - return new WrappedImmutableConciseBitmap(new ImmutableConciseSet(readOnlyBuffer)); + buffer.limit(buffer.position() + numBytes); + return new WrappedImmutableConciseBitmap(new ImmutableConciseSet(buffer)); } @Override diff --git a/processing/src/main/java/io/druid/segment/data/GenericIndexed.java b/processing/src/main/java/io/druid/segment/data/GenericIndexed.java index 6e278ea79b1..d67f6df0d2c 100644 --- a/processing/src/main/java/io/druid/segment/data/GenericIndexed.java +++ b/processing/src/main/java/io/druid/segment/data/GenericIndexed.java @@ -19,26 +19,23 @@ package io.druid.segment.data; -import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; import io.druid.common.utils.SerializerUtils; +import io.druid.io.ZeroCopyByteArrayOutputStream; import io.druid.java.util.common.IAE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.io.smoosh.SmooshedFileMapper; import it.unimi.dsi.fastutil.bytes.ByteArrays; -import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; -import java.util.List; /** * A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input @@ -71,9 +68,10 @@ import java.util.List; */ public class GenericIndexed implements Indexed { - public static final byte VERSION_ONE = 0x1; - public static final byte VERSION_TWO = 0x2; - private static final byte REVERSE_LOOKUP_ALLOWED = 0x1; + static final byte VERSION_ONE = 0x1; + static final byte VERSION_TWO = 0x2; + static final byte REVERSE_LOOKUP_ALLOWED = 0x1; + static final byte REVERSE_LOOKUP_DISALLOWED = 0x0; private final static Ordering NATURAL_STRING_ORDERING = Ordering.natural().nullsFirst(); private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); @@ -107,129 +105,32 @@ public class GenericIndexed implements Indexed } }; - private final ObjectStrategy strategy; - private final boolean allowReverseLookup; - private final int size; - private final BufferIndexed bufferIndexed; - private final List valueBuffers; - private final ByteBuffer headerBuffer; - private int logBaseTwoOfElementsPerValueFile; - - private ByteBuffer theBuffer; - - // used for single file version, v1 - GenericIndexed( - ByteBuffer buffer, - ObjectStrategy strategy, - boolean allowReverseLookup - ) + public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy) { - this.theBuffer = buffer; - this.strategy = strategy; - this.allowReverseLookup = allowReverseLookup; - size = theBuffer.getInt(); + byte versionFromBuffer = buffer.get(); - int indexOffset = theBuffer.position(); - int valuesOffset = theBuffer.position() + size * Ints.BYTES; - - buffer.position(valuesOffset); - valueBuffers = Lists.newArrayList(buffer.slice()); - buffer.position(indexOffset); - headerBuffer = buffer.slice(); - final ByteBuffer valueBuffer = valueBuffers.get(0); - bufferIndexed = new BufferIndexed() - { - @Override - public T get(int index) - { - checkIndex(index, size); - - final int startOffset; - final int endOffset; - - if (index == 0) { - startOffset = 4; - endOffset = headerBuffer.getInt(0); - } else { - int headerPosition = (index - 1) * Ints.BYTES; - startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES; - endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES); - } - return _get(valueBuffer.asReadOnlyBuffer(), startOffset, endOffset); - } - }; + if (VERSION_ONE == versionFromBuffer) { + return createGenericIndexedVersionOne(buffer, strategy); + } else if (VERSION_TWO == versionFromBuffer) { + throw new IAE( + "use read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper)" + + " to read version 2 indexed." + ); + } + throw new IAE("Unknown version[%d]", (int) versionFromBuffer); } - // used for multiple file version, v2. - GenericIndexed( - List valueBuffs, - ByteBuffer headerBuff, - ObjectStrategy strategy, - boolean allowReverseLookup, - int logBaseTwoOfElementsPerValueFile, - int numWritten - ) + public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper) { - this.strategy = strategy; - this.allowReverseLookup = allowReverseLookup; - this.valueBuffers = valueBuffs; - this.headerBuffer = headerBuff; - this.size = numWritten; - this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile; - headerBuffer.order(ByteOrder.nativeOrder()); - bufferIndexed = new BufferIndexed() - { - @Override - public T get(int index) - { - int fileNum = index >> GenericIndexed.this.logBaseTwoOfElementsPerValueFile; - final ByteBuffer copyBuffer = valueBuffers.get(fileNum).asReadOnlyBuffer(); + byte versionFromBuffer = buffer.get(); - checkIndex(index, size); - - final int startOffset; - final int endOffset; - int relativePositionOfIndex = index & ((1 << GenericIndexed.this.logBaseTwoOfElementsPerValueFile) - 1); - if (relativePositionOfIndex == 0) { - int headerPosition = index * Ints.BYTES; - startOffset = 4; - endOffset = headerBuffer.getInt(headerPosition); - } else { - int headerPosition = (index - 1) * Ints.BYTES; - startOffset = headerBuffer.getInt(headerPosition) + 4; - endOffset = headerBuffer.getInt(headerPosition + 4); - } - return _get(copyBuffer, startOffset, endOffset); - } - }; - } - - public static int getNumberOfFilesRequired(int bagSize, long numWritten) - { - int numberOfFilesRequired = (int) (numWritten / bagSize); - if ((numWritten % bagSize) != 0) { - numberOfFilesRequired += 1; + if (VERSION_ONE == versionFromBuffer) { + return createGenericIndexedVersionOne(buffer, strategy); + } else if (VERSION_TWO == versionFromBuffer) { + return createGenericIndexedVersionTwo(buffer, strategy, fileMapper); } - return numberOfFilesRequired; - } - /** - * Checks if {@code index} a valid `element index` in GenericIndexed. - * Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message. - *

- * Used here to get existing behavior(same error message and exception) of V1 GenericIndexed. - * - * @param index index identifying an element of an GenericIndexed. - * @param size number of elements. - */ - private static void checkIndex(int index, int size) - { - if (index < 0) { - throw new IAE("Index[%s] < 0", index); - } - if (index >= size) { - throw new IAE(String.format("Index[%s] >= size[%s]", index, size)); - } + throw new IAE("Unknown version [%s]", versionFromBuffer); } public static GenericIndexed fromArray(T[] objects, ObjectStrategy strategy) @@ -239,160 +140,121 @@ public class GenericIndexed implements Indexed public static GenericIndexed fromIterable(Iterable objectsIterable, ObjectStrategy strategy) { - Iterator objects = objectsIterable.iterator(); - if (!objects.hasNext()) { - final ByteBuffer buffer = ByteBuffer.allocate(4).putInt(0); - buffer.flip(); - return new GenericIndexed(buffer, strategy, true); - } - - boolean allowReverseLookup = true; - int count = 0; - - ByteArrayOutputStream headerBytes = new ByteArrayOutputStream(); - ByteArrayOutputStream valueBytes = new ByteArrayOutputStream(); - try { - int offset = 0; - T prevVal = null; - do { - count++; - T next = objects.next(); - if (allowReverseLookup && prevVal != null && !(strategy.compare(prevVal, next) < 0)) { - allowReverseLookup = false; - } - - final byte[] bytes = strategy.toBytes(next); - offset += 4 + bytes.length; - headerBytes.write(Ints.toByteArray(offset)); - valueBytes.write(Ints.toByteArray(bytes.length)); - valueBytes.write(bytes); - - if (prevVal instanceof Closeable) { - CloseQuietly.close((Closeable) prevVal); - } - prevVal = next; - } while (objects.hasNext()); - - if (prevVal instanceof Closeable) { - CloseQuietly.close((Closeable) prevVal); - } - } - catch (IOException e) { - throw new RuntimeException(e); - } - - ByteBuffer theBuffer = ByteBuffer.allocate(Ints.BYTES + headerBytes.size() + valueBytes.size()); - theBuffer.put(Ints.toByteArray(count)); - theBuffer.put(headerBytes.toByteArray()); - theBuffer.put(valueBytes.toByteArray()); - theBuffer.flip(); - - return new GenericIndexed(theBuffer.asReadOnlyBuffer(), strategy, allowReverseLookup); + return fromIterableVersionOne(objectsIterable, strategy); } - public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy) + static int getNumberOfFilesRequired(int bagSize, long numWritten) { - byte versionFromBuffer = buffer.get(); - - if (VERSION_ONE == versionFromBuffer) { - return createVersionOneGenericIndexed(buffer, strategy); - } else if (VERSION_TWO == versionFromBuffer) { - throw new IAE( - "use read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper)" - + " to read version 2 indexed.", - versionFromBuffer - ); + int numberOfFilesRequired = (int) (numWritten / bagSize); + if ((numWritten % bagSize) != 0) { + numberOfFilesRequired += 1; } - throw new IAE("Unknown version[%s]", versionFromBuffer); + return numberOfFilesRequired; } - private static GenericIndexed createVersionOneGenericIndexed(ByteBuffer byteBuffer, ObjectStrategy strategy) - { - boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; - int size = byteBuffer.getInt(); - ByteBuffer bufferToUse = byteBuffer.asReadOnlyBuffer(); - bufferToUse.limit(bufferToUse.position() + size); - byteBuffer.position(bufferToUse.limit()); - return new GenericIndexed( - bufferToUse, - strategy, - allowReverseLookup - ); - } + private final boolean versionOne; - private static GenericIndexed createVersionTwoGenericIndexed( - ByteBuffer byteBuffer, + private final ObjectStrategy strategy; + private final boolean allowReverseLookup; + private final int size; + private final ByteBuffer headerBuffer; + + private final ByteBuffer firstValueBuffer; + + private final ByteBuffer[] valueBuffers; + private int logBaseTwoOfElementsPerValueFile; + private int relativeIndexMask; + + private ByteBuffer theBuffer; + + /** + * Constructor for version one. + */ + GenericIndexed( + ByteBuffer buffer, ObjectStrategy strategy, - SmooshedFileMapper fileMapper + boolean allowReverseLookup ) { - if (fileMapper == null) { - throw new IAE("SmooshedFileMapper can not be null for version 2."); - } - boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; - int logBaseTwoOfElementsPerValueFile = byteBuffer.getInt(); - int numElements = byteBuffer.getInt(); - String columnName; + this.versionOne = true; - List valueBuffersToUse; - ByteBuffer headerBuffer; - try { - columnName = SERIALIZER_UTILS.readString(byteBuffer); - valueBuffersToUse = Lists.newArrayList(); - int elementsPerValueFile = 1 << logBaseTwoOfElementsPerValueFile; - int numberOfFilesRequired = getNumberOfFilesRequired(elementsPerValueFile, numElements); - for (int i = 0; i < numberOfFilesRequired; i++) { - valueBuffersToUse.add( - fileMapper.mapFile(GenericIndexedWriter.generateValueFileName(columnName, i)) - .asReadOnlyBuffer() - ); - } - headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName)); - } - catch (IOException e) { - throw new RuntimeException("File mapping failed.", e); - } + this.theBuffer = buffer; + this.strategy = strategy; + this.allowReverseLookup = allowReverseLookup; + size = theBuffer.getInt(); - return new GenericIndexed( - valueBuffersToUse, - headerBuffer, - strategy, - allowReverseLookup, - logBaseTwoOfElementsPerValueFile, - numElements - ); + int indexOffset = theBuffer.position(); + int valuesOffset = theBuffer.position() + size * Ints.BYTES; + + buffer.position(valuesOffset); + // Ensure the value buffer's limit equals to capacity. + firstValueBuffer = buffer.slice(); + valueBuffers = new ByteBuffer[]{firstValueBuffer}; + buffer.position(indexOffset); + headerBuffer = buffer.slice(); } - public static GenericIndexed read(ByteBuffer buffer, ObjectStrategy strategy, SmooshedFileMapper fileMapper) + + /** + * Constructor for version two. + */ + GenericIndexed( + ByteBuffer[] valueBuffs, + ByteBuffer headerBuff, + ObjectStrategy strategy, + boolean allowReverseLookup, + int logBaseTwoOfElementsPerValueFile, + int numWritten + ) { - byte versionFromBuffer = buffer.get(); + this.versionOne = false; - if (VERSION_ONE == versionFromBuffer) { - return createVersionOneGenericIndexed(buffer, strategy); - } else if (VERSION_TWO == versionFromBuffer) { - return createVersionTwoGenericIndexed(buffer, strategy, fileMapper); + this.strategy = strategy; + this.allowReverseLookup = allowReverseLookup; + this.valueBuffers = valueBuffs; + this.firstValueBuffer = valueBuffers[0]; + this.headerBuffer = headerBuff; + this.size = numWritten; + this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile; + this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1; + headerBuffer.order(ByteOrder.nativeOrder()); + } + + /** + * Checks if {@code index} a valid `element index` in GenericIndexed. + * Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message. + *

+ * Used here to get existing behavior(same error message and exception) of V1 GenericIndexed. + * + * @param index index identifying an element of an GenericIndexed. + */ + private void checkIndex(int index) + { + if (index < 0) { + throw new IAE("Index[%s] < 0", index); + } + if (index >= size) { + throw new IAE(String.format("Index[%s] >= size[%s]", index, size)); } - - throw new IAE("Unknown version [%s]", versionFromBuffer); } @Override public Class getClazz() { - return bufferIndexed.getClazz(); + return strategy.getClazz(); } @Override public int size() { - return bufferIndexed.size(); + return size; } @Override public T get(int index) { - return bufferIndexed.get(index); + return versionOne ? getVersionOne(index) : getVersionTwo(index); } /** @@ -407,35 +269,56 @@ public class GenericIndexed implements Indexed @Override public int indexOf(T value) { - return bufferIndexed.indexOf(value); + return indexOf(this, value); + } + + private int indexOf(Indexed indexed, T value) + { + if (!allowReverseLookup) { + throw new UnsupportedOperationException("Reverse lookup not allowed."); + } + + value = (value != null && value.equals("")) ? null : value; + + int minIndex = 0; + int maxIndex = size - 1; + while (minIndex <= maxIndex) { + int currIndex = (minIndex + maxIndex) >>> 1; + + T currValue = indexed.get(currIndex); + int comparison = strategy.compare(currValue, value); + if (comparison == 0) { + return currIndex; + } + + if (comparison < 0) { + minIndex = currIndex + 1; + } else { + maxIndex = currIndex - 1; + } + } + + return -(minIndex + 1); } @Override public Iterator iterator() { - return bufferIndexed.iterator(); + return IndexedIterable.create(this).iterator(); } public long getSerializedSize() { - if (valueBuffers.size() != 1) { + if (!versionOne) { throw new UnsupportedOperationException("Method not supported for version 2 GenericIndexed."); } - return theBuffer.remaining() - + 2 - + Ints.BYTES - + Ints.BYTES; //2 Bytes for version and sorted flag. 4 bytes to store numbers - // of bytes and next 4 bytes to store number of elements. + return getSerializedSizeVersionOne(); } public void writeToChannel(WritableByteChannel channel) throws IOException { - //version 2 will always have more than one buffer in valueBuffers. - if (valueBuffers.size() == 1) { - channel.write(ByteBuffer.wrap(new byte[]{VERSION_ONE, allowReverseLookup ? (byte) 0x1 : (byte) 0x0})); - channel.write(ByteBuffer.wrap(Ints.toByteArray(theBuffer.remaining() + 4))); // 4 Bytes to store size. - channel.write(ByteBuffer.wrap(Ints.toByteArray(size))); - channel.write(theBuffer.asReadOnlyBuffer()); + if (versionOne) { + writeToChannelVersionOne(channel); } else { throw new UnsupportedOperationException( "GenericIndexed serialization for V2 is unsupported. Use GenericIndexedWriter instead."); @@ -449,64 +332,19 @@ public class GenericIndexed implements Indexed */ public GenericIndexed.BufferIndexed singleThreaded() { - if (valueBuffers.size() == 1) { - final ByteBuffer copyBuffer = valueBuffers.get(0).asReadOnlyBuffer(); - return new BufferIndexed() - { - @Override - public T get(final int index) - { - checkIndex(index, size); + return versionOne ? singleThreadedVersionOne() : singleThreadedVersionTwo(); + } - final int startOffset; - final int endOffset; - - if (index == 0) { - startOffset = 4; - endOffset = headerBuffer.getInt(0); - } else { - int headerPosition = (index - 1) * Ints.BYTES; - startOffset = headerBuffer.getInt(headerPosition) + 4; - endOffset = headerBuffer.getInt(headerPosition + 4); - } - return _get(copyBuffer, startOffset, endOffset); - } - }; - } else { - - final List copyValueBuffers = new ArrayList<>(); - for (ByteBuffer buffer : valueBuffers) { - copyValueBuffers.add(buffer.asReadOnlyBuffer()); - } - - return new BufferIndexed() - { - @Override - public T get(final int index) - { - int fileNum = index >> logBaseTwoOfElementsPerValueFile; - final ByteBuffer copyBuffer = copyValueBuffers.get(fileNum); - - checkIndex(index, size); - final int startOffset; - final int endOffset; - - int relativePositionOfIndex = index & ((1 << logBaseTwoOfElementsPerValueFile) - 1); - if (relativePositionOfIndex == 0) { - int headerPosition = index * Ints.BYTES; - startOffset = 4; - endOffset = headerBuffer.getInt(headerPosition); - } else { - int headerPosition = (index - 1) * Ints.BYTES; - startOffset = headerBuffer.getInt(headerPosition) + 4; - endOffset = headerBuffer.getInt(headerPosition + 4); - } - - return _get(copyBuffer, startOffset, endOffset); - } - }; + private T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int endOffset) + { + final int size = endOffset - startOffset; + if (size == 0) { + return null; } - + ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer(); + copyValueBuffer.position(startOffset); + // fromByteBuffer must not modify the buffer limit + return strategy.fromByteBuffer(copyValueBuffer, size); } abstract class BufferIndexed implements Indexed @@ -525,15 +363,19 @@ public class GenericIndexed implements Indexed return size; } - protected T _get(ByteBuffer copyValueBuffer, int startOffset, int endOffset) + T bufferedIndexedGet(ByteBuffer copyValueBuffer, int startOffset, int endOffset) { final int size = endOffset - startOffset; - if (startOffset == endOffset) { + lastReadSize = size; + if (size == 0) { return null; } + // ObjectStrategy.fromByteBuffer() is allowed to reset the limit of the buffer. So if the limit is changed, + // position() call in the next line could throw an exception, if the position is set beyond the new limit. clear() + // sets the limit to the maximum possible, the capacity. It is safe to reset the limit to capacity, because the + // value buffer(s) initial limit equals to capacity. + copyValueBuffer.clear(); copyValueBuffer.position(startOffset); - lastReadSize = size; - // fromByteBuffer must not modify the buffer limit return strategy.fromByteBuffer(copyValueBuffer, size); } @@ -542,7 +384,7 @@ public class GenericIndexed implements Indexed * * @return the size in bytes of the last value read */ - public int getLastValueSize() + int getLastValueSize() { return lastReadSize; } @@ -550,38 +392,247 @@ public class GenericIndexed implements Indexed @Override public int indexOf(T value) { - if (!allowReverseLookup) { - throw new UnsupportedOperationException("Reverse lookup not allowed."); - } - - value = (value != null && value.equals("")) ? null : value; - - int minIndex = 0; - int maxIndex = size - 1; - while (minIndex <= maxIndex) { - int currIndex = (minIndex + maxIndex) >>> 1; - - T currValue = get(currIndex); - int comparison = strategy.compare(currValue, value); - if (comparison == 0) { - return currIndex; - } - - if (comparison < 0) { - minIndex = currIndex + 1; - } else { - maxIndex = currIndex - 1; - } - } - - return -(minIndex + 1); + return GenericIndexed.this.indexOf(this, value); } @Override public Iterator iterator() { - return IndexedIterable.create(this).iterator(); + return GenericIndexed.this.iterator(); } } + /////////////// + // VERSION ONE + /////////////// + + private static GenericIndexed createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy strategy) + { + boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; + int size = byteBuffer.getInt(); + ByteBuffer bufferToUse = byteBuffer.asReadOnlyBuffer(); + bufferToUse.limit(bufferToUse.position() + size); + byteBuffer.position(bufferToUse.limit()); + + return new GenericIndexed<>( + bufferToUse, + strategy, + allowReverseLookup + ); + } + + private static GenericIndexed fromIterableVersionOne(Iterable objectsIterable, ObjectStrategy strategy) + { + Iterator objects = objectsIterable.iterator(); + if (!objects.hasNext()) { + final ByteBuffer buffer = ByteBuffer.allocate(Ints.BYTES).putInt(0); + buffer.flip(); + return new GenericIndexed<>(buffer, strategy, true); + } + + boolean allowReverseLookup = true; + int count = 0; + + ZeroCopyByteArrayOutputStream headerBytes = new ZeroCopyByteArrayOutputStream(); + ZeroCopyByteArrayOutputStream valueBytes = new ZeroCopyByteArrayOutputStream(); + ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES); + try { + int offset = 0; + T prevVal = null; + do { + count++; + T next = objects.next(); + if (allowReverseLookup && prevVal != null && !(strategy.compare(prevVal, next) < 0)) { + allowReverseLookup = false; + } + + final byte[] bytes = strategy.toBytes(next); + offset += Ints.BYTES + bytes.length; + SerializerUtils.writeBigEndianIntToOutputStream(headerBytes, offset, helperBuffer); + SerializerUtils.writeBigEndianIntToOutputStream(valueBytes, bytes.length, helperBuffer); + valueBytes.write(bytes); + + if (prevVal instanceof Closeable) { + CloseQuietly.close((Closeable) prevVal); + } + prevVal = next; + } while (objects.hasNext()); + + if (prevVal instanceof Closeable) { + CloseQuietly.close((Closeable) prevVal); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + + ByteBuffer theBuffer = ByteBuffer.allocate(Ints.BYTES + headerBytes.size() + valueBytes.size()); + theBuffer.putInt(count); + headerBytes.writeTo(theBuffer); + valueBytes.writeTo(theBuffer); + theBuffer.flip(); + + return new GenericIndexed<>(theBuffer.asReadOnlyBuffer(), strategy, allowReverseLookup); + } + + private long getSerializedSizeVersionOne() + { + return theBuffer.remaining() + + 1 // version byte + + 1 // allowReverseLookup flag + + Ints.BYTES // numBytesUsed + + Ints.BYTES; // numElements + } + + private T getVersionOne(int index) + { + checkIndex(index); + + final int startOffset; + final int endOffset; + + if (index == 0) { + startOffset = Ints.BYTES; + endOffset = headerBuffer.getInt(0); + } else { + int headerPosition = (index - 1) * Ints.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES; + endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES); + } + return copyBufferAndGet(firstValueBuffer, startOffset, endOffset); + } + + private BufferIndexed singleThreadedVersionOne() + { + final ByteBuffer copyBuffer = firstValueBuffer.asReadOnlyBuffer(); + return new BufferIndexed() + { + @Override + public T get(final int index) + { + checkIndex(index); + + final int startOffset; + final int endOffset; + + if (index == 0) { + startOffset = 4; + endOffset = headerBuffer.getInt(0); + } else { + int headerPosition = (index - 1) * Ints.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES; + endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES); + } + return bufferedIndexedGet(copyBuffer, startOffset, endOffset); + } + }; + } + + private void writeToChannelVersionOne(WritableByteChannel channel) throws IOException + { + channel.write(ByteBuffer.wrap(new byte[]{ + VERSION_ONE, + allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : REVERSE_LOOKUP_DISALLOWED + })); + channel.write(ByteBuffer.wrap(Ints.toByteArray(theBuffer.remaining() + Ints.BYTES))); + channel.write(ByteBuffer.wrap(Ints.toByteArray(size))); + channel.write(theBuffer.asReadOnlyBuffer()); + } + + + /////////////// + // VERSION TWO + /////////////// + + private static GenericIndexed createGenericIndexedVersionTwo( + ByteBuffer byteBuffer, + ObjectStrategy strategy, + SmooshedFileMapper fileMapper + ) + { + if (fileMapper == null) { + throw new IAE("SmooshedFileMapper can not be null for version 2."); + } + boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED; + int logBaseTwoOfElementsPerValueFile = byteBuffer.getInt(); + int numElements = byteBuffer.getInt(); + + try { + String columnName = SERIALIZER_UTILS.readString(byteBuffer); + int elementsPerValueFile = 1 << logBaseTwoOfElementsPerValueFile; + int numberOfFilesRequired = getNumberOfFilesRequired(elementsPerValueFile, numElements); + ByteBuffer[] valueBuffersToUse = new ByteBuffer[numberOfFilesRequired]; + for (int i = 0; i < numberOfFilesRequired; i++) { + // SmooshedFileMapper.mapFile() contract guarantees that the valueBuffer's limit equals to capacity. + ByteBuffer valueBuffer = fileMapper.mapFile(GenericIndexedWriter.generateValueFileName(columnName, i)); + valueBuffersToUse[i] = valueBuffer.asReadOnlyBuffer(); + } + ByteBuffer headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName)); + return new GenericIndexed<>( + valueBuffersToUse, + headerBuffer, + strategy, + allowReverseLookup, + logBaseTwoOfElementsPerValueFile, + numElements + ); + } + catch (IOException e) { + throw new RuntimeException("File mapping failed.", e); + } + } + + private T getVersionTwo(int index) + { + checkIndex(index); + + final int startOffset; + final int endOffset; + + int relativePositionOfIndex = index & relativeIndexMask; + if (relativePositionOfIndex == 0) { + int headerPosition = index * Ints.BYTES; + startOffset = Ints.BYTES; + endOffset = headerBuffer.getInt(headerPosition); + } else { + int headerPosition = (index - 1) * Ints.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES; + endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES); + } + int fileNum = index >> logBaseTwoOfElementsPerValueFile; + return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset); + } + + private BufferIndexed singleThreadedVersionTwo() + { + final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length]; + for (int i = 0; i < valueBuffers.length; i++) { + copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer(); + } + + return new BufferIndexed() + { + @Override + public T get(final int index) + { + checkIndex(index); + + final int startOffset; + final int endOffset; + + int relativePositionOfIndex = index & relativeIndexMask; + if (relativePositionOfIndex == 0) { + int headerPosition = index * Ints.BYTES; + startOffset = 4; + endOffset = headerBuffer.getInt(headerPosition); + } else { + int headerPosition = (index - 1) * Ints.BYTES; + startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES; + endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES); + } + int fileNum = index >> logBaseTwoOfElementsPerValueFile; + return bufferedIndexedGet(copyValueBuffers[fileNum], startOffset, endOffset); + } + }; + } } diff --git a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java index c5928fdffb6..d2e91f094f3 100644 --- a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java @@ -27,6 +27,7 @@ import com.google.common.io.CountingOutputStream; import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import io.druid.common.utils.SerializerUtils; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.io.smoosh.FileSmoosher; @@ -66,6 +67,7 @@ public class GenericIndexedWriter implements Closeable private long numWritten = 0; private boolean requireMultipleFiles = false; private ByteBuffer buf; + private final ByteBuffer sizeHelperBuffer = ByteBuffer.allocate(Ints.BYTES); public GenericIndexedWriter( @@ -122,20 +124,6 @@ public class GenericIndexedWriter implements Closeable } } - private static void writeLongValueToOutputStream(ByteBuffer helperBuf, long value, CountingOutputStream outLong) - throws IOException - { - helperBuf.putLong(0, value); - outLong.write(helperBuf.array()); - } - - private static void writeIntValueToOutputStream(ByteBuffer helperBuf, int value, CountingOutputStream outLong) - throws IOException - { - helperBuf.putInt(0, value); - outLong.write(helperBuf.array()); - } - public void open() throws IOException { headerOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("header"))); @@ -151,13 +139,13 @@ public class GenericIndexedWriter implements Closeable byte[] bytesToWrite = strategy.toBytes(objectToWrite); ++numWritten; - valuesOut.write(Ints.toByteArray(bytesToWrite.length)); + SerializerUtils.writeBigEndianIntToOutputStream(valuesOut, bytesToWrite.length, sizeHelperBuffer); valuesOut.write(bytesToWrite); if (!requireMultipleFiles) { - writeIntValueToOutputStream(buf, Ints.checkedCast(valuesOut.getCount()), headerOut); + SerializerUtils.writeBigEndianIntToOutputStream(headerOut, Ints.checkedCast(valuesOut.getCount()), buf); } else { - writeLongValueToOutputStream(buf, valuesOut.getCount(), headerOutLong); + SerializerUtils.writeNativeOrderedLongToOutputStream(headerOutLong, valuesOut.getCount(), buf); } if (!requireMultipleFiles && getSerializedSize() > fileSizeLimit) { @@ -205,7 +193,7 @@ public class GenericIndexedWriter implements Closeable try (OutputStream metaOut = ioPeon.makeOutputStream(makeFilename("meta"))) { metaOut.write(GenericIndexed.VERSION_ONE); - metaOut.write(objectsSorted ? 0x1 : 0x0); + metaOut.write(objectsSorted ? GenericIndexed.REVERSE_LOOKUP_ALLOWED : GenericIndexed.REVERSE_LOOKUP_DISALLOWED); metaOut.write(Ints.toByteArray(Ints.checkedCast(numBytesWritten + 4))); metaOut.write(Ints.toByteArray(Ints.checkedCast(numWritten))); } @@ -366,7 +354,7 @@ public class GenericIndexedWriter implements Closeable int bagSizePower = bagSizePower(); OutputStream metaOut = Channels.newOutputStream(channel); metaOut.write(GenericIndexed.VERSION_TWO); - metaOut.write(objectsSorted ? 0x1 : 0x0); + metaOut.write(objectsSorted ? GenericIndexed.REVERSE_LOOKUP_ALLOWED : GenericIndexed.REVERSE_LOOKUP_DISALLOWED); metaOut.write(Ints.toByteArray(bagSizePower)); metaOut.write(Ints.toByteArray(Ints.checkedCast(numWritten))); metaOut.write(Ints.toByteArray(fileNameByteArray.length)); @@ -436,7 +424,11 @@ public class GenericIndexedWriter implements Closeable } currentNumBytes = Long.reverseBytes(headerFile.readLong()); relativeNumBytes = currentNumBytes - relativeRefBytes; - writeIntValueToOutputStream(helperBuffer, Ints.checkedCast(relativeNumBytes), finalHeaderOut); + SerializerUtils.writeNativeOrderedIntToOutputStream( + finalHeaderOut, + Ints.checkedCast(relativeNumBytes), + helperBuffer + ); } long numBytesToPutInFile = finalHeaderOut.getCount(); @@ -460,7 +452,7 @@ public class GenericIndexedWriter implements Closeable ByteBuffer buf = ByteBuffer.allocate(Longs.BYTES).order(ByteOrder.nativeOrder()); for (int i = 0; i < numWritten; i++) { int count = headerFile.readInt(); - writeLongValueToOutputStream(buf, count, headerOutLong); + SerializerUtils.writeNativeOrderedLongToOutputStream(headerOutLong, count, buf); } } } diff --git a/processing/src/main/java/io/druid/segment/data/IndexedRTree.java b/processing/src/main/java/io/druid/segment/data/IndexedRTree.java index ba1ea9123aa..b306e91d92f 100644 --- a/processing/src/main/java/io/druid/segment/data/IndexedRTree.java +++ b/processing/src/main/java/io/druid/segment/data/IndexedRTree.java @@ -86,9 +86,8 @@ public class IndexedRTree implements Comparable @Override public ImmutableRTree fromByteBuffer(ByteBuffer buffer, int numBytes) { - final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); - readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); - return new ImmutableRTree(readOnlyBuffer, bitmapFactory); + buffer.limit(buffer.position() + numBytes); + return new ImmutableRTree(buffer, bitmapFactory); } @Override diff --git a/processing/src/main/java/io/druid/segment/data/IntBufferIndexedInts.java b/processing/src/main/java/io/druid/segment/data/IntBufferIndexedInts.java deleted file mode 100644 index e8712bcd3ca..00000000000 --- a/processing/src/main/java/io/druid/segment/data/IntBufferIndexedInts.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment.data; - -import com.google.common.collect.Ordering; -import com.google.common.primitives.Ints; -import io.druid.collections.IntList; -import it.unimi.dsi.fastutil.ints.IntIterator; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.IntBuffer; - -/** - */ -public class IntBufferIndexedInts implements IndexedInts, Comparable -{ - public static ObjectStrategy objectStrategy = - new IntBufferIndexedIntsObjectStrategy(); - - public static IntBufferIndexedInts fromArray(int[] array) - { - final ByteBuffer buffer = ByteBuffer.allocate(array.length * Ints.BYTES); - buffer.asIntBuffer().put(array); - - return new IntBufferIndexedInts(buffer.asReadOnlyBuffer()); - } - - public static IntBufferIndexedInts fromIntList(IntList intList) - { - final ByteBuffer buffer = ByteBuffer.allocate(intList.length() * Ints.BYTES); - final IntBuffer intBuf = buffer.asIntBuffer(); - - for (int i = 0; i < intList.length(); ++i) { - intBuf.put(intList.get(i)); - } - - return new IntBufferIndexedInts(buffer.asReadOnlyBuffer()); - } - - private final ByteBuffer buffer; - - public IntBufferIndexedInts(ByteBuffer buffer) - { - this.buffer = buffer; - } - - @Override - public int size() - { - return buffer.remaining() / 4; - } - - @Override - public int get(int index) - { - return buffer.getInt(buffer.position() + (index * 4)); - } - - public ByteBuffer getBuffer() - { - return buffer.asReadOnlyBuffer(); - } - - @Override - public int compareTo(IntBufferIndexedInts o) - { - return buffer.compareTo(o.getBuffer()); - } - - @Override - public IntIterator iterator() - { - return new IndexedIntsIterator(this); - } - - private static class IntBufferIndexedIntsObjectStrategy implements ObjectStrategy - { - @Override - public Class getClazz() - { - return IntBufferIndexedInts.class; - } - - @Override - public IntBufferIndexedInts fromByteBuffer(ByteBuffer buffer, int numBytes) - { - final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); - readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); - return new IntBufferIndexedInts(readOnlyBuffer); - } - - @Override - public byte[] toBytes(IntBufferIndexedInts val) - { - ByteBuffer buffer = val.getBuffer(); - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - - return bytes; - } - - @Override - public int compare(IntBufferIndexedInts o1, IntBufferIndexedInts o2) - { - return Ordering.natural().nullsFirst().compare(o1, o2); - } - } - - @Override - public void fill(int index, int[] toFill) - { - throw new UnsupportedOperationException("fill not supported"); - } - - @Override - public void close() throws IOException - { - - } -} diff --git a/processing/src/main/java/io/druid/segment/data/IntermediateLongSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/IntermediateLongSupplierSerializer.java index 478d1f6777b..3cf761b2b6b 100644 --- a/processing/src/main/java/io/druid/segment/data/IntermediateLongSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/IntermediateLongSupplierSerializer.java @@ -19,20 +19,27 @@ package io.druid.segment.data; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import com.google.common.io.ByteSink; import com.google.common.io.CountingOutputStream; import com.google.common.math.LongMath; import com.google.common.primitives.Longs; +import io.druid.common.utils.SerializerUtils; import io.druid.java.util.common.io.smoosh.FileSmoosher; +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.longs.LongList; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.WritableByteChannel; +/** + * Unsafe for concurrent use from multiple threads. + */ public class IntermediateLongSupplierSerializer implements LongSupplierSerializer { @@ -42,10 +49,13 @@ public class IntermediateLongSupplierSerializer implements LongSupplierSerialize private final ByteOrder order; private final CompressedObjectStrategy.CompressionStrategy compression; private CountingOutputStream tempOut = null; + private final ByteBuffer helperBuffer = ByteBuffer.allocate(Longs.BYTES); private int numInserted = 0; - private BiMap uniqueValues = HashBiMap.create(); + private final Long2IntMap uniqueValues = new Long2IntOpenHashMap(); + private final LongList valuesAddedInOrder = new LongArrayList(); + private long maxVal = Long.MIN_VALUE; private long minVal = Long.MAX_VALUE; @@ -77,10 +87,11 @@ public class IntermediateLongSupplierSerializer implements LongSupplierSerialize public void add(long value) throws IOException { - tempOut.write(Longs.toByteArray(value)); + SerializerUtils.writeBigEndianLongToOutputStream(tempOut, value, helperBuffer); ++numInserted; if (uniqueValues.size() <= CompressionFactory.MAX_TABLE_SIZE && !uniqueValues.containsKey(value)) { uniqueValues.put(value, uniqueValues.size()); + valuesAddedInOrder.add(value); } if (value > maxVal) { maxVal = value; @@ -101,7 +112,7 @@ public class IntermediateLongSupplierSerializer implements LongSupplierSerialize delta = -1; } if (uniqueValues.size() <= CompressionFactory.MAX_TABLE_SIZE) { - writer = new TableLongEncodingWriter(uniqueValues); + writer = new TableLongEncodingWriter(uniqueValues, valuesAddedInOrder); } else if (delta != -1 && delta != Long.MAX_VALUE) { writer = new DeltaLongEncodingWriter(minVal, delta); } else { diff --git a/processing/src/main/java/io/druid/segment/data/ObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/ObjectStrategy.java index 5778a82e89e..a0ab34ba6bd 100644 --- a/processing/src/main/java/io/druid/segment/data/ObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/ObjectStrategy.java @@ -29,8 +29,11 @@ public interface ObjectStrategy extends Comparator /** * Convert values from their underlying byte representation. * - * Implementations of this method must not change the given buffer mark, or limit, but may modify its position. - * Use buffer.asReadOnlyBuffer() or buffer.duplicate() if mark or limit need to be set. + * Implementations of this method may change the given buffer's mark, or limit, and position. + * + * Implementations of this method may not store the given buffer in a field of the "deserialized" object, + * need to use {@link ByteBuffer#slice()}, {@link ByteBuffer#asReadOnlyBuffer()} or {@link ByteBuffer#duplicate()} in + * this case. * * @param buffer buffer to read value from * @param numBytes number of bytes used to store the value, starting at buffer.position() diff --git a/processing/src/main/java/io/druid/segment/data/RoaringBitmapSerdeFactory.java b/processing/src/main/java/io/druid/segment/data/RoaringBitmapSerdeFactory.java index 9c2ae1d21a8..dc7c45ad70a 100644 --- a/processing/src/main/java/io/druid/segment/data/RoaringBitmapSerdeFactory.java +++ b/processing/src/main/java/io/druid/segment/data/RoaringBitmapSerdeFactory.java @@ -102,9 +102,8 @@ public class RoaringBitmapSerdeFactory implements BitmapSerdeFactory @Override public ImmutableBitmap fromByteBuffer(ByteBuffer buffer, int numBytes) { - final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); - readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes); - return new WrappedImmutableRoaringBitmap(new ImmutableRoaringBitmap(readOnlyBuffer)); + buffer.limit(buffer.position() + numBytes); + return new WrappedImmutableRoaringBitmap(new ImmutableRoaringBitmap(buffer)); } @Override diff --git a/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java b/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java index 9d0e4e48ba1..69fde7e03fb 100644 --- a/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java +++ b/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java @@ -19,10 +19,12 @@ package io.druid.segment.data; -import com.google.common.collect.BiMap; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import io.druid.common.utils.SerializerUtils; import io.druid.java.util.common.IAE; +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import it.unimi.dsi.fastutil.longs.LongList; import java.io.IOException; import java.io.OutputStream; @@ -31,16 +33,18 @@ import java.nio.ByteBuffer; public class TableLongEncodingWriter implements CompressionFactory.LongEncodingWriter { - private final BiMap table; + private final Long2IntMap table; + private final LongList valueAddedInOrder; private final int bitsPerValue; private VSizeLongSerde.LongSerializer serializer; - public TableLongEncodingWriter(BiMap table) + public TableLongEncodingWriter(Long2IntMap table, LongList valueAddedInOrder) { if (table.size() > CompressionFactory.MAX_TABLE_SIZE) { throw new IAE("Invalid table size[%s]", table.size()); } this.table = table; + this.valueAddedInOrder = valueAddedInOrder; this.bitsPerValue = VSizeLongSerde.getBitsForMax(table.size()); } @@ -77,9 +81,9 @@ public class TableLongEncodingWriter implements CompressionFactory.LongEncodingW metaOut.write(CompressionFactory.LongEncodingFormat.TABLE.getId()); metaOut.write(CompressionFactory.TABLE_ENCODING_VERSION); metaOut.write(Ints.toByteArray(table.size())); - BiMap inverse = table.inverse(); - for (int i = 0; i < table.size(); i++) { - metaOut.write(Longs.toByteArray(inverse.get(i))); + ByteBuffer helperBuffer = ByteBuffer.allocate(Longs.BYTES); + for (int i = 0; i < valueAddedInOrder.size(); i++) { + SerializerUtils.writeBigEndianLongToOutputStream(metaOut, valueAddedInOrder.getLong(i), helperBuffer); } } diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexed.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexed.java index 4696cc97bef..873c5675216 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeIndexed.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexed.java @@ -20,10 +20,11 @@ package io.druid.segment.data; import com.google.common.primitives.Ints; +import io.druid.common.utils.SerializerUtils; +import io.druid.io.ZeroCopyByteArrayOutputStream; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; @@ -55,12 +56,12 @@ public class VSizeIndexed implements IndexedMultivalue ++count; } - ByteArrayOutputStream headerBytes = new ByteArrayOutputStream(4 + (count * 4)); - ByteArrayOutputStream valueBytes = new ByteArrayOutputStream(); + ZeroCopyByteArrayOutputStream headerBytes = new ZeroCopyByteArrayOutputStream(4 + (count * 4)); + ZeroCopyByteArrayOutputStream valueBytes = new ZeroCopyByteArrayOutputStream(); + ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES); int offset = 0; - try { - headerBytes.write(Ints.toByteArray(count)); + SerializerUtils.writeBigEndianIntToOutputStream(headerBytes, count, helperBuffer); for (VSizeIndexedInts object : objectsIterable) { if (object.getNumBytes() != numBytes) { @@ -68,7 +69,7 @@ public class VSizeIndexed implements IndexedMultivalue } byte[] bytes = object.getBytesNoPadding(); offset += bytes.length; - headerBytes.write(Ints.toByteArray(offset)); + SerializerUtils.writeBigEndianIntToOutputStream(headerBytes, offset, helperBuffer); valueBytes.write(bytes); } valueBytes.write(new byte[4 - numBytes]); @@ -78,8 +79,8 @@ public class VSizeIndexed implements IndexedMultivalue } ByteBuffer theBuffer = ByteBuffer.allocate(headerBytes.size() + valueBytes.size()); - theBuffer.put(headerBytes.toByteArray()); - theBuffer.put(valueBytes.toByteArray()); + headerBytes.writeTo(theBuffer); + valueBytes.writeTo(theBuffer); theBuffer.flip(); return new VSizeIndexed(theBuffer.asReadOnlyBuffer(), numBytes); diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexedInts.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexedInts.java index f8f9c9fdcf1..95b72122aeb 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeIndexedInts.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexedInts.java @@ -53,7 +53,7 @@ public class VSizeIndexedInts implements IndexedInts, Comparable list, int maxValue) + public static byte[] getBytesNoPaddingFromList(List list, int maxValue) { int numBytes = getNumBytesForMax(maxValue); @@ -76,6 +76,7 @@ public class VSizeIndexedInts implements IndexedInts, Comparable list, int numBytes, int maxValue) { int i = 0; + ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES); for (Integer val : list) { if (val < 0) { throw new IAE("integer values must be positive, got[%d], i[%d]", val, i); @@ -84,8 +85,8 @@ public class VSizeIndexedInts implements IndexedInts, Comparable maxValue[%d], please don't lie about maxValue. i[%d]", val, maxValue, i); } - byte[] intAsBytes = Ints.toByteArray(val); - buffer.put(intAsBytes, intAsBytes.length - numBytes, numBytes); + helperBuffer.putInt(0, val); + buffer.put(helperBuffer.array(), Ints.BYTES - numBytes, numBytes); ++i; } buffer.position(0); diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexedIntsWriter.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexedIntsWriter.java index c6a56e61351..727072aa075 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeIndexedIntsWriter.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexedIntsWriter.java @@ -42,6 +42,7 @@ public class VSizeIndexedIntsWriter extends SingleValueIndexedIntsWriter private final int numBytes; private CountingOutputStream valuesOut = null; + private final ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES); public VSizeIndexedIntsWriter( final IOPeon ioPeon, @@ -63,8 +64,8 @@ public class VSizeIndexedIntsWriter extends SingleValueIndexedIntsWriter @Override protected void addValue(int val) throws IOException { - byte[] intAsBytes = Ints.toByteArray(val); - valuesOut.write(intAsBytes, intAsBytes.length - numBytes, numBytes); + helperBuffer.putInt(0, val); + valuesOut.write(helperBuffer.array(), Ints.BYTES - numBytes, numBytes); } @Override diff --git a/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java index 2746f75afe4..37ea293acd1 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeIndexedWriter.java @@ -84,7 +84,7 @@ public class VSizeIndexedWriter extends MultiValueIndexedIntsWriter implements C public void write(List ints) throws IOException { - byte[] bytesToWrite = ints == null ? EMPTY_ARRAY : VSizeIndexedInts.getBytesNoPaddingfromList(ints, maxId); + byte[] bytesToWrite = ints == null ? EMPTY_ARRAY : VSizeIndexedInts.getBytesNoPaddingFromList(ints, maxId); valuesOut.write(bytesToWrite); diff --git a/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java b/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java index d95a2d21877..3e8e7398f9f 100644 --- a/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java +++ b/processing/src/test/java/io/druid/segment/data/IndexedIntsTest.java @@ -42,8 +42,7 @@ public class IndexedIntsTest { return Arrays.asList( new Object[][]{ - {VSizeIndexedInts.fromArray(array)}, - {IntBufferIndexedInts.fromArray(array)} + {VSizeIndexedInts.fromArray(array)} } ); } diff --git a/processing/src/test/java/io/druid/segment/data/VSizeIndexedIntsTest.java b/processing/src/test/java/io/druid/segment/data/VSizeIndexedIntsTest.java index 3894f0e8a49..c8ebcf1290b 100644 --- a/processing/src/test/java/io/druid/segment/data/VSizeIndexedIntsTest.java +++ b/processing/src/test/java/io/druid/segment/data/VSizeIndexedIntsTest.java @@ -73,7 +73,7 @@ public class VSizeIndexedIntsTest int maxValue = Ints.max(array); VSizeIndexedInts ints = VSizeIndexedInts.fromList(list, maxValue); byte[] bytes1 = ints.getBytesNoPadding(); - byte[] bytes2 = VSizeIndexedInts.getBytesNoPaddingfromList(list, maxValue); + byte[] bytes2 = VSizeIndexedInts.getBytesNoPaddingFromList(list, maxValue); Assert.assertArrayEquals(bytes1, bytes2); } }