GenericIndexed minor bug fixes, optimizations and refactoring (#3951)

* Minor bug fixes in GenericIndexed; Refactor and optimize GenericIndexed; Remove some unnecessary ByteBuffer duplications in some deserialization paths; Add ZeroCopyByteArrayOutputStream

* Fixes

* Move GenericIndexedWriter.writeLongValueToOutputStream() and writeIntValueToOutputStream() to SerializerUtils

* Move constructors

* Add GenericIndexedBenchmark

* Comments

* Typo

* Note in Javadoc that IntermediateLongSupplierSerializer, LongColumnSerializer and LongMetricColumnSerializer are thread-unsafe

* Use primitive collections in IntermediateLongSupplierSerializer instead of BiMap

* Optimize TableLongEncodingWriter

* Add checks to SerializerUtils methods

* Don't restrict byte order in SerializerUtils.writeLongToOutputStream() and writeIntToOutputStream()

* Update GenericIndexedBenchmark

* SerializerUtils.writeIntToOutputStream() and writeLongToOutputStream() separate for big-endian and native-endian

* Add GenericIndexedBenchmark.indexOf()

* More checks in methods in SerializerUtils

* Use helperBuffer.arrayOffset()

* Optimizations in SerializerUtils
This commit is contained in:
Roman Leventov 2017-03-27 13:17:31 -06:00 committed by Himanshu
parent 117c698c59
commit 73d9b31664
31 changed files with 820 additions and 640 deletions

View File

@ -0,0 +1,174 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.benchmark;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.GenericIndexedWriter;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.data.TmpFileIOPeon;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@OperationsPerInvocation(GenericIndexedBenchmark.ITERATIONS)
@Warmup(iterations = 5)
@Measurement(iterations = 20)
@Fork(1)
@State(Scope.Benchmark)
public class GenericIndexedBenchmark
{
public static final int ITERATIONS = 10000;
static final ObjectStrategy<byte[]> byteArrayStrategy = new ObjectStrategy<byte[]>()
{
@Override
public Class<? extends byte[]> getClazz()
{
return byte[].class;
}
@Override
public byte[] fromByteBuffer(ByteBuffer buffer, int numBytes)
{
byte[] result = new byte[numBytes];
buffer.get(result);
return result;
}
@Override
public byte[] toBytes(byte[] val)
{
return val;
}
@Override
public int compare(byte[] o1, byte[] o2)
{
return Integer.compare(Ints.fromByteArray(o1), Ints.fromByteArray(o2));
}
};
@Param({"10000"})
public int n;
@Param({"8"})
public int elementSize;
private File file;
private File smooshDir;
private GenericIndexed<byte[]> genericIndexed;
private int[] iterationIndexes;
private byte[][] elementsToSearch;
@Setup(Level.Trial)
public void createGenericIndexed() throws IOException
{
GenericIndexedWriter<byte[]> genericIndexedWriter = new GenericIndexedWriter<>(
new TmpFileIOPeon(),
"genericIndexedBenchmark",
byteArrayStrategy
);
genericIndexedWriter.open();
// GenericIndexObject caches prevObject for comparison, so need two arrays for correct objectsSorted computation.
ByteBuffer[] elements = new ByteBuffer[2];
elements[0] = ByteBuffer.allocate(elementSize);
elements[1] = ByteBuffer.allocate(elementSize);
for (int i = 0; i < n; i++) {
ByteBuffer element = elements[i & 1];
element.putInt(0, i);
genericIndexedWriter.write(element.array());
}
genericIndexedWriter.close();
smooshDir = Files.createTempDir();
file = File.createTempFile("genericIndexedBenchmark", "meta");
try (FileChannel fileChannel =
FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
FileSmoosher fileSmoosher = new FileSmoosher(smooshDir)) {
genericIndexedWriter.writeToChannel(fileChannel, fileSmoosher);
}
FileChannel fileChannel = FileChannel.open(file.toPath());
MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, file.length());
genericIndexed = GenericIndexed.read(byteBuffer, byteArrayStrategy, SmooshedFileMapper.load(smooshDir));
}
@Setup(Level.Trial)
public void createIterationIndexes()
{
iterationIndexes = new int[ITERATIONS];
for (int i = 0; i < ITERATIONS; i++) {
iterationIndexes[i] = ThreadLocalRandom.current().nextInt(n);
}
}
@Setup(Level.Trial)
public void createElementsToSearch()
{
elementsToSearch = new byte[ITERATIONS][];
for (int i = 0; i < ITERATIONS; i++) {
elementsToSearch[i] = Ints.toByteArray(ThreadLocalRandom.current().nextInt(n));
}
}
@Benchmark
public void get(Blackhole bh)
{
for (int i : iterationIndexes) {
bh.consume(genericIndexed.get(i));
}
}
@Benchmark
public int indexOf()
{
int r = 0;
for (byte[] elementToSearch : elementsToSearch) {
r ^= genericIndexed.indexOf(elementToSearch);
}
return r;
}
}

View File

@ -34,7 +34,7 @@ public class WrappedImmutableConciseBitmap implements ImmutableBitmap
public WrappedImmutableConciseBitmap(ByteBuffer byteBuffer)
{
this.bitmap = new ImmutableConciseSet(byteBuffer.asReadOnlyBuffer());
this.bitmap = new ImmutableConciseSet(byteBuffer);
}
/**

View File

@ -36,7 +36,7 @@ public class WrappedImmutableRoaringBitmap implements ImmutableBitmap
protected WrappedImmutableRoaringBitmap(ByteBuffer byteBuffer)
{
this.bitmap = new ImmutableRoaringBitmap(byteBuffer.asReadOnlyBuffer());
this.bitmap = new ImmutableRoaringBitmap(byteBuffer);
}
/**

View File

@ -50,6 +50,7 @@ public class ImmutableRTree
public ImmutableRTree(ByteBuffer data, BitmapFactory bitmapFactory)
{
data = data.asReadOnlyBuffer();
final int initPosition = data.position();
Preconditions.checkArgument(data.get(0) == VERSION, "Mismatching versions");
this.numDims = data.getInt(1 + initPosition) & 0x7FFF;
@ -69,7 +70,7 @@ public class ImmutableRTree
buffer.putInt(rTree.getNumDims());
rTree.getRoot().storeInByteBuffer(buffer, buffer.position());
buffer.position(0);
return new ImmutableRTree(buffer.asReadOnlyBuffer(), rTree.getBitmapFactory());
return new ImmutableRTree(buffer, rTree.getBitmapFactory());
}
private static int calcNumBytes(RTree tree)

View File

@ -21,14 +21,16 @@ package io.druid.common.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Floats;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.druid.collections.IntList;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.util.Arrays;
@ -38,6 +40,70 @@ public class SerializerUtils
{
private static final Charset UTF8 = Charset.forName("UTF-8");
/**
* Writes the given long value into the given OutputStream in big-endian byte order, using the helperBuffer. Faster
* alternative to out.write(Longs.toByteArray(value)), more convenient (sometimes) than wrapping the OutputStream into
* {@link java.io.DataOutputStream}.
*
* @param helperBuffer a big-endian heap ByteBuffer with capacity of at least 8
*/
public static void writeBigEndianLongToOutputStream(OutputStream out, long value, ByteBuffer helperBuffer)
throws IOException
{
if (helperBuffer.order() != ByteOrder.BIG_ENDIAN || !helperBuffer.hasArray()) {
throw new IllegalArgumentException("Expected writable, big-endian, heap byteBuffer");
}
helperBuffer.putLong(0, value);
out.write(helperBuffer.array(), helperBuffer.arrayOffset(), Longs.BYTES);
}
/**
* Writes the given long value into the given OutputStream in the native byte order, using the helperBuffer.
*
* @param helperBuffer a heap ByteBuffer with capacity of at least 8, with the native byte order
*/
public static void writeNativeOrderedLongToOutputStream(OutputStream out, long value, ByteBuffer helperBuffer)
throws IOException
{
if (helperBuffer.order() != ByteOrder.nativeOrder() || !helperBuffer.hasArray()) {
throw new IllegalArgumentException("Expected writable heap byteBuffer with the native byte order");
}
helperBuffer.putLong(0, value);
out.write(helperBuffer.array(), helperBuffer.arrayOffset(), Longs.BYTES);
}
/**
* Writes the given int value into the given OutputStream in big-endian byte order, using the helperBuffer. Faster
* alternative to out.write(Ints.toByteArray(value)), more convenient (sometimes) than wrapping the OutputStream into
* {@link java.io.DataOutputStream}.
*
* @param helperBuffer a big-endian heap ByteBuffer with capacity of at least 4
*/
public static void writeBigEndianIntToOutputStream(OutputStream out, int value, ByteBuffer helperBuffer)
throws IOException
{
if (helperBuffer.order() != ByteOrder.BIG_ENDIAN || !helperBuffer.hasArray()) {
throw new IllegalArgumentException("Expected writable, big-endian, heap byteBuffer");
}
helperBuffer.putInt(0, value);
out.write(helperBuffer.array(), helperBuffer.arrayOffset(), Ints.BYTES);
}
/**
* Writes the given int value into the given OutputStream in the native byte order, using the given helperBuffer.
*
* @param helperBuffer a heap ByteBuffer with capacity of at least 4, with the native byte order
*/
public static void writeNativeOrderedIntToOutputStream(OutputStream out, int value, ByteBuffer helperBuffer)
throws IOException
{
if (helperBuffer.order() != ByteOrder.nativeOrder() || !helperBuffer.hasArray()) {
throw new IllegalArgumentException("Expected writable heap byteBuffer with the native byte order");
}
helperBuffer.putInt(0, value);
out.write(helperBuffer.array(), helperBuffer.arrayOffset(), Ints.BYTES);
}
public <T extends OutputStream> void writeString(T out, String name) throws IOException
{
byte[] nameBytes = name.getBytes(UTF8);
@ -122,16 +188,12 @@ public class SerializerUtils
public void writeInt(OutputStream out, int intValue) throws IOException
{
byte[] outBytes = new byte[4];
ByteBuffer.wrap(outBytes).putInt(intValue);
out.write(outBytes);
out.write(Ints.toByteArray(intValue));
}
public void writeInt(WritableByteChannel out, int intValue) throws IOException
{
final ByteBuffer buffer = ByteBuffer.allocate(4);
final ByteBuffer buffer = ByteBuffer.allocate(Ints.BYTES);
buffer.putInt(intValue);
buffer.flip();
out.write(buffer);
@ -139,19 +201,19 @@ public class SerializerUtils
public int readInt(InputStream in) throws IOException
{
byte[] intBytes = new byte[4];
byte[] intBytes = new byte[Ints.BYTES];
ByteStreams.readFully(in, intBytes);
return ByteBuffer.wrap(intBytes).getInt();
return Ints.fromByteArray(intBytes);
}
public void writeInts(OutputStream out, int[] ints) throws IOException
{
writeInt(out, ints.length);
for (int i = 0; i < ints.length; i++) {
writeInt(out, ints[i]);
for (int value : ints) {
writeInt(out, value);
}
}
@ -178,16 +240,12 @@ public class SerializerUtils
public void writeLong(OutputStream out, long longValue) throws IOException
{
byte[] outBytes = new byte[8];
ByteBuffer.wrap(outBytes).putLong(longValue);
out.write(outBytes);
out.write(Longs.toByteArray(longValue));
}
public void writeLong(WritableByteChannel out, long longValue) throws IOException
{
final ByteBuffer buffer = ByteBuffer.allocate(8);
final ByteBuffer buffer = ByteBuffer.allocate(Longs.BYTES);
buffer.putLong(longValue);
buffer.flip();
out.write(buffer);
@ -195,19 +253,19 @@ public class SerializerUtils
public long readLong(InputStream in) throws IOException
{
byte[] longBytes = new byte[8];
byte[] longBytes = new byte[Longs.BYTES];
ByteStreams.readFully(in, longBytes);
return ByteBuffer.wrap(longBytes).getLong();
return Longs.fromByteArray(longBytes);
}
public void writeLongs(OutputStream out, long[] longs) throws IOException
{
writeInt(out, longs.length);
for (int i = 0; i < longs.length; i++) {
writeLong(out, longs[i]);
for (long value : longs) {
writeLong(out, value);
}
}
@ -223,18 +281,14 @@ public class SerializerUtils
return retVal;
}
public void writeFloat(OutputStream out, float intValue) throws IOException
public void writeFloat(OutputStream out, float floatValue) throws IOException
{
byte[] outBytes = new byte[4];
ByteBuffer.wrap(outBytes).putFloat(intValue);
out.write(outBytes);
writeInt(out, Float.floatToRawIntBits(floatValue));
}
public void writeFloat(WritableByteChannel out, float floatValue) throws IOException
{
final ByteBuffer buffer = ByteBuffer.allocate(4);
final ByteBuffer buffer = ByteBuffer.allocate(Floats.BYTES);
buffer.putFloat(floatValue);
buffer.flip();
out.write(buffer);
@ -242,19 +296,15 @@ public class SerializerUtils
public float readFloat(InputStream in) throws IOException
{
byte[] floatBytes = new byte[4];
ByteStreams.readFully(in, floatBytes);
return ByteBuffer.wrap(floatBytes).getFloat();
return Float.intBitsToFloat(readInt(in));
}
public void writeFloats(OutputStream out, float[] floats) throws IOException
{
writeInt(out, floats.length);
for (int i = 0; i < floats.length; i++) {
writeFloat(out, floats[i]);
for (float value : floats) {
writeFloat(out, value);
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.io;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
public class ZeroCopyByteArrayOutputStream extends ByteArrayOutputStream
{
public ZeroCopyByteArrayOutputStream()
{
}
public ZeroCopyByteArrayOutputStream(int capacity)
{
super(capacity);
}
public void writeTo(ByteBuffer outputBuffer)
{
outputBuffer.put(buf, 0, count);
}
public void writeTo(WritableByteChannel channel) throws IOException
{
channel.write(ByteBuffer.wrap(buf, 0, count));
}
}

View File

@ -1430,16 +1430,13 @@ public class ApproximateHistogram
*/
public static ApproximateHistogram fromBytes(ByteBuffer buf)
{
ByteBuffer copy = buf.asReadOnlyBuffer();
// negative size indicates compact representation
// this works regardless of whether we use int or short for the size since the leftmost bit is the sign bit
if (copy.getShort(buf.position()) < 0) {
if (buf.getShort(buf.position()) < 0) {
return fromBytesCompact(buf);
} else {
// ignore size
copy.getInt();
// determine if sparse or dense based on sign of binCount
if (copy.getInt() < 0) {
// ignore size, determine if sparse or dense based on sign of binCount
if (buf.getInt(buf.position() + Ints.BYTES) < 0) {
return fromBytesSparse(buf);
} else {
return fromBytesDense(buf);

View File

@ -120,9 +120,8 @@ public class ApproximateHistogramFoldingSerde extends ComplexMetricSerde
@Override
public ApproximateHistogram fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
return ApproximateHistogram.fromBytes(readOnlyBuffer);
buffer.limit(buffer.position() + numBytes);
return ApproximateHistogram.fromBytes(buffer);
}
@Override

View File

@ -103,9 +103,8 @@ public class VarianceSerde extends ComplexMetricSerde
@Override
public VarianceAggregatorCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
return VarianceAggregatorCollector.from(readOnlyBuffer);
buffer.limit(buffer.position() + numBytes);
return VarianceAggregatorCollector.from(buffer);
}
@Override

View File

@ -111,6 +111,10 @@ public class SmooshedFileMapper implements Closeable
return internalFiles.keySet();
}
/**
* Returns a mapped buffer of the smooshed file with the given name. Buffer's contents from 0 to capacity() are the
* whole mapped file contents, limit() is equal to capacity().
*/
public ByteBuffer mapFile(String name) throws IOException
{
final Metadata metadata = internalFiles.get(name);

View File

@ -120,6 +120,8 @@ public class HyperUniquesSerde extends ComplexMetricSerde
@Override
public HyperLogLogCollector fromByteBuffer(ByteBuffer buffer, int numBytes)
{
// make a copy of buffer, because the given buffer is not duplicated in HyperLogLogCollector.makeCollector() and
// stored in a field.
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
return HyperLogLogCollector.makeCollector(readOnlyBuffer);

View File

@ -43,6 +43,7 @@ import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.collections.bitmap.MutableBitmap;
import io.druid.collections.spatial.ImmutableRTree;
import io.druid.common.utils.SerializerUtils;
import io.druid.io.ZeroCopyByteArrayOutputStream;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
@ -689,17 +690,7 @@ public class IndexIO
final ColumnDescriptor serdeficator = builder
.addSerde(columnPartBuilder.build())
.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
dimension, serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel, v9Smoosher);
channel.close();
makeColumn(v9Smoosher, dimension, serdeficator);
} else if (filename.startsWith("met_") || filename.startsWith("numeric_dim_")) {
// NOTE: identifying numeric dimensions by using a different filename pattern is meant to allow the
// legacy merger (which will be deprecated) to support long/float dims. Going forward, the V9 merger
@ -751,17 +742,7 @@ public class IndexIO
}
final ColumnDescriptor serdeficator = builder.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
metric, serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel, v9Smoosher);
channel.close();
makeColumn(v9Smoosher, metric, serdeficator);
} else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) {
CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer(
v8SmooshedFiles.mapFile(filename),
@ -778,17 +759,7 @@ public class IndexIO
.build()
);
final ColumnDescriptor serdeficator = builder.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
"__time", serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel, v9Smoosher);
channel.close();
makeColumn(v9Smoosher, "__time", serdeficator);
} else {
skippedFiles.add(filename);
}
@ -854,6 +825,20 @@ public class IndexIO
closer.close();
}
}
private void makeColumn(FileSmoosher v9Smoosher, String dimension, ColumnDescriptor serdeficator)
throws IOException
{
ZeroCopyByteArrayOutputStream specBytes = new ZeroCopyByteArrayOutputStream();
serializerUtils.writeString(specBytes, mapper.writeValueAsString(serdeficator));
try (SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
dimension, serdeficator.numBytes() + specBytes.size()
)) {
specBytes.writeTo(channel);
serdeficator.write(channel, v9Smoosher);
}
}
}
static interface IndexLoader

View File

@ -31,6 +31,7 @@ import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import io.druid.common.utils.JodaUtils;
import io.druid.io.ZeroCopyByteArrayOutputStream;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
import io.druid.java.util.common.io.smoosh.SmooshedWriter;
@ -56,7 +57,6 @@ import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -402,20 +402,14 @@ public class IndexMergerV9 extends IndexMerger
final ColumnDescriptor serdeficator
) throws IOException
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
columnName, serdeficator.numBytes() + specBytes.length
);
try {
channel.write(ByteBuffer.wrap(specBytes));
ZeroCopyByteArrayOutputStream specBytes = new ZeroCopyByteArrayOutputStream();
serializerUtils.writeString(specBytes, mapper.writeValueAsString(serdeficator));
try (SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
columnName, serdeficator.numBytes() + specBytes.size()
)) {
specBytes.writeTo(channel);
serdeficator.write(channel, v9Smoosher);
}
finally {
channel.close();
}
}
private void mergeIndexesAndWriteColumns(

View File

@ -29,6 +29,9 @@ import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/**
* Unsafe for concurrent use from multiple threads.
*/
public class LongColumnSerializer implements GenericColumnSerializer
{
public static LongColumnSerializer create(

View File

@ -30,6 +30,7 @@ import java.io.File;
import java.io.IOException;
/**
* Unsafe for concurrent use from multiple threads.
*/
public class LongMetricColumnSerializer implements MetricColumnSerializer
{

View File

@ -26,11 +26,13 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
@ -46,6 +48,7 @@ public class ByteBufferWriter<T> implements Closeable
private CountingOutputStream headerOut = null;
private CountingOutputStream valueOut = null;
private final ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES);
public ByteBufferWriter(
IOPeon ioPeon,
@ -67,8 +70,7 @@ public class ByteBufferWriter<T> implements Closeable
public void write(T objectToWrite) throws IOException
{
byte[] bytesToWrite = strategy.toBytes(objectToWrite);
headerOut.write(Ints.toByteArray(bytesToWrite.length));
SerializerUtils.writeBigEndianIntToOutputStream(headerOut, bytesToWrite.length, helperBuffer);
valueOut.write(bytesToWrite);
}

View File

@ -80,9 +80,8 @@ public class ConciseBitmapSerdeFactory implements BitmapSerdeFactory
@Override
public WrappedImmutableConciseBitmap fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
return new WrappedImmutableConciseBitmap(new ImmutableConciseSet(readOnlyBuffer));
buffer.limit(buffer.position() + numBytes);
return new WrappedImmutableConciseBitmap(new ImmutableConciseSet(buffer));
}
@Override

View File

@ -19,26 +19,23 @@
package io.druid.segment.data;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import io.druid.common.utils.SerializerUtils;
import io.druid.io.ZeroCopyByteArrayOutputStream;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* A generic, flat storage mechanism. Use static methods fromArray() or fromIterable() to construct. If input
@ -71,9 +68,10 @@ import java.util.List;
*/
public class GenericIndexed<T> implements Indexed<T>
{
public static final byte VERSION_ONE = 0x1;
public static final byte VERSION_TWO = 0x2;
private static final byte REVERSE_LOOKUP_ALLOWED = 0x1;
static final byte VERSION_ONE = 0x1;
static final byte VERSION_TWO = 0x2;
static final byte REVERSE_LOOKUP_ALLOWED = 0x1;
static final byte REVERSE_LOOKUP_DISALLOWED = 0x0;
private final static Ordering<String> NATURAL_STRING_ORDERING = Ordering.natural().nullsFirst();
private static final SerializerUtils SERIALIZER_UTILS = new SerializerUtils();
@ -107,129 +105,32 @@ public class GenericIndexed<T> implements Indexed<T>
}
};
private final ObjectStrategy<T> strategy;
private final boolean allowReverseLookup;
private final int size;
private final BufferIndexed bufferIndexed;
private final List<ByteBuffer> valueBuffers;
private final ByteBuffer headerBuffer;
private int logBaseTwoOfElementsPerValueFile;
private ByteBuffer theBuffer;
// used for single file version, v1
GenericIndexed(
ByteBuffer buffer,
ObjectStrategy<T> strategy,
boolean allowReverseLookup
)
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy)
{
this.theBuffer = buffer;
this.strategy = strategy;
this.allowReverseLookup = allowReverseLookup;
size = theBuffer.getInt();
byte versionFromBuffer = buffer.get();
int indexOffset = theBuffer.position();
int valuesOffset = theBuffer.position() + size * Ints.BYTES;
buffer.position(valuesOffset);
valueBuffers = Lists.newArrayList(buffer.slice());
buffer.position(indexOffset);
headerBuffer = buffer.slice();
final ByteBuffer valueBuffer = valueBuffers.get(0);
bufferIndexed = new BufferIndexed()
{
@Override
public T get(int index)
{
checkIndex(index, size);
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = 4;
endOffset = headerBuffer.getInt(0);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES);
}
return _get(valueBuffer.asReadOnlyBuffer(), startOffset, endOffset);
}
};
if (VERSION_ONE == versionFromBuffer) {
return createGenericIndexedVersionOne(buffer, strategy);
} else if (VERSION_TWO == versionFromBuffer) {
throw new IAE(
"use read(ByteBuffer buffer, ObjectStrategy<T> strategy, SmooshedFileMapper fileMapper)"
+ " to read version 2 indexed."
);
}
throw new IAE("Unknown version[%d]", (int) versionFromBuffer);
}
// used for multiple file version, v2.
GenericIndexed(
List<ByteBuffer> valueBuffs,
ByteBuffer headerBuff,
ObjectStrategy<T> strategy,
boolean allowReverseLookup,
int logBaseTwoOfElementsPerValueFile,
int numWritten
)
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy, SmooshedFileMapper fileMapper)
{
this.strategy = strategy;
this.allowReverseLookup = allowReverseLookup;
this.valueBuffers = valueBuffs;
this.headerBuffer = headerBuff;
this.size = numWritten;
this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile;
headerBuffer.order(ByteOrder.nativeOrder());
bufferIndexed = new BufferIndexed()
{
@Override
public T get(int index)
{
int fileNum = index >> GenericIndexed.this.logBaseTwoOfElementsPerValueFile;
final ByteBuffer copyBuffer = valueBuffers.get(fileNum).asReadOnlyBuffer();
byte versionFromBuffer = buffer.get();
checkIndex(index, size);
final int startOffset;
final int endOffset;
int relativePositionOfIndex = index & ((1 << GenericIndexed.this.logBaseTwoOfElementsPerValueFile) - 1);
if (relativePositionOfIndex == 0) {
int headerPosition = index * Ints.BYTES;
startOffset = 4;
endOffset = headerBuffer.getInt(headerPosition);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + 4;
endOffset = headerBuffer.getInt(headerPosition + 4);
}
return _get(copyBuffer, startOffset, endOffset);
}
};
}
public static int getNumberOfFilesRequired(int bagSize, long numWritten)
{
int numberOfFilesRequired = (int) (numWritten / bagSize);
if ((numWritten % bagSize) != 0) {
numberOfFilesRequired += 1;
if (VERSION_ONE == versionFromBuffer) {
return createGenericIndexedVersionOne(buffer, strategy);
} else if (VERSION_TWO == versionFromBuffer) {
return createGenericIndexedVersionTwo(buffer, strategy, fileMapper);
}
return numberOfFilesRequired;
}
/**
* Checks if {@code index} a valid `element index` in GenericIndexed.
* Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message.
* <p>
* Used here to get existing behavior(same error message and exception) of V1 GenericIndexed.
*
* @param index index identifying an element of an GenericIndexed.
* @param size number of elements.
*/
private static void checkIndex(int index, int size)
{
if (index < 0) {
throw new IAE("Index[%s] < 0", index);
}
if (index >= size) {
throw new IAE(String.format("Index[%s] >= size[%s]", index, size));
}
throw new IAE("Unknown version [%s]", versionFromBuffer);
}
public static <T> GenericIndexed<T> fromArray(T[] objects, ObjectStrategy<T> strategy)
@ -239,160 +140,121 @@ public class GenericIndexed<T> implements Indexed<T>
public static <T> GenericIndexed<T> fromIterable(Iterable<T> objectsIterable, ObjectStrategy<T> strategy)
{
Iterator<T> objects = objectsIterable.iterator();
if (!objects.hasNext()) {
final ByteBuffer buffer = ByteBuffer.allocate(4).putInt(0);
buffer.flip();
return new GenericIndexed<T>(buffer, strategy, true);
}
boolean allowReverseLookup = true;
int count = 0;
ByteArrayOutputStream headerBytes = new ByteArrayOutputStream();
ByteArrayOutputStream valueBytes = new ByteArrayOutputStream();
try {
int offset = 0;
T prevVal = null;
do {
count++;
T next = objects.next();
if (allowReverseLookup && prevVal != null && !(strategy.compare(prevVal, next) < 0)) {
allowReverseLookup = false;
}
final byte[] bytes = strategy.toBytes(next);
offset += 4 + bytes.length;
headerBytes.write(Ints.toByteArray(offset));
valueBytes.write(Ints.toByteArray(bytes.length));
valueBytes.write(bytes);
if (prevVal instanceof Closeable) {
CloseQuietly.close((Closeable) prevVal);
}
prevVal = next;
} while (objects.hasNext());
if (prevVal instanceof Closeable) {
CloseQuietly.close((Closeable) prevVal);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
ByteBuffer theBuffer = ByteBuffer.allocate(Ints.BYTES + headerBytes.size() + valueBytes.size());
theBuffer.put(Ints.toByteArray(count));
theBuffer.put(headerBytes.toByteArray());
theBuffer.put(valueBytes.toByteArray());
theBuffer.flip();
return new GenericIndexed<T>(theBuffer.asReadOnlyBuffer(), strategy, allowReverseLookup);
return fromIterableVersionOne(objectsIterable, strategy);
}
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy)
static int getNumberOfFilesRequired(int bagSize, long numWritten)
{
byte versionFromBuffer = buffer.get();
if (VERSION_ONE == versionFromBuffer) {
return createVersionOneGenericIndexed(buffer, strategy);
} else if (VERSION_TWO == versionFromBuffer) {
throw new IAE(
"use read(ByteBuffer buffer, ObjectStrategy<T> strategy, SmooshedFileMapper fileMapper)"
+ " to read version 2 indexed.",
versionFromBuffer
);
int numberOfFilesRequired = (int) (numWritten / bagSize);
if ((numWritten % bagSize) != 0) {
numberOfFilesRequired += 1;
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
return numberOfFilesRequired;
}
private static <T> GenericIndexed<T> createVersionOneGenericIndexed(ByteBuffer byteBuffer, ObjectStrategy<T> strategy)
{
boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED;
int size = byteBuffer.getInt();
ByteBuffer bufferToUse = byteBuffer.asReadOnlyBuffer();
bufferToUse.limit(bufferToUse.position() + size);
byteBuffer.position(bufferToUse.limit());
return new GenericIndexed<T>(
bufferToUse,
strategy,
allowReverseLookup
);
}
private final boolean versionOne;
private static <T> GenericIndexed<T> createVersionTwoGenericIndexed(
ByteBuffer byteBuffer,
private final ObjectStrategy<T> strategy;
private final boolean allowReverseLookup;
private final int size;
private final ByteBuffer headerBuffer;
private final ByteBuffer firstValueBuffer;
private final ByteBuffer[] valueBuffers;
private int logBaseTwoOfElementsPerValueFile;
private int relativeIndexMask;
private ByteBuffer theBuffer;
/**
* Constructor for version one.
*/
GenericIndexed(
ByteBuffer buffer,
ObjectStrategy<T> strategy,
SmooshedFileMapper fileMapper
boolean allowReverseLookup
)
{
if (fileMapper == null) {
throw new IAE("SmooshedFileMapper can not be null for version 2.");
}
boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED;
int logBaseTwoOfElementsPerValueFile = byteBuffer.getInt();
int numElements = byteBuffer.getInt();
String columnName;
this.versionOne = true;
List<ByteBuffer> valueBuffersToUse;
ByteBuffer headerBuffer;
try {
columnName = SERIALIZER_UTILS.readString(byteBuffer);
valueBuffersToUse = Lists.newArrayList();
int elementsPerValueFile = 1 << logBaseTwoOfElementsPerValueFile;
int numberOfFilesRequired = getNumberOfFilesRequired(elementsPerValueFile, numElements);
for (int i = 0; i < numberOfFilesRequired; i++) {
valueBuffersToUse.add(
fileMapper.mapFile(GenericIndexedWriter.generateValueFileName(columnName, i))
.asReadOnlyBuffer()
);
}
headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName));
}
catch (IOException e) {
throw new RuntimeException("File mapping failed.", e);
}
this.theBuffer = buffer;
this.strategy = strategy;
this.allowReverseLookup = allowReverseLookup;
size = theBuffer.getInt();
return new GenericIndexed<T>(
valueBuffersToUse,
headerBuffer,
strategy,
allowReverseLookup,
logBaseTwoOfElementsPerValueFile,
numElements
);
int indexOffset = theBuffer.position();
int valuesOffset = theBuffer.position() + size * Ints.BYTES;
buffer.position(valuesOffset);
// Ensure the value buffer's limit equals to capacity.
firstValueBuffer = buffer.slice();
valueBuffers = new ByteBuffer[]{firstValueBuffer};
buffer.position(indexOffset);
headerBuffer = buffer.slice();
}
public static <T> GenericIndexed<T> read(ByteBuffer buffer, ObjectStrategy<T> strategy, SmooshedFileMapper fileMapper)
/**
* Constructor for version two.
*/
GenericIndexed(
ByteBuffer[] valueBuffs,
ByteBuffer headerBuff,
ObjectStrategy<T> strategy,
boolean allowReverseLookup,
int logBaseTwoOfElementsPerValueFile,
int numWritten
)
{
byte versionFromBuffer = buffer.get();
this.versionOne = false;
if (VERSION_ONE == versionFromBuffer) {
return createVersionOneGenericIndexed(buffer, strategy);
} else if (VERSION_TWO == versionFromBuffer) {
return createVersionTwoGenericIndexed(buffer, strategy, fileMapper);
this.strategy = strategy;
this.allowReverseLookup = allowReverseLookup;
this.valueBuffers = valueBuffs;
this.firstValueBuffer = valueBuffers[0];
this.headerBuffer = headerBuff;
this.size = numWritten;
this.logBaseTwoOfElementsPerValueFile = logBaseTwoOfElementsPerValueFile;
this.relativeIndexMask = (1 << logBaseTwoOfElementsPerValueFile) - 1;
headerBuffer.order(ByteOrder.nativeOrder());
}
/**
* Checks if {@code index} a valid `element index` in GenericIndexed.
* Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message.
* <p>
* Used here to get existing behavior(same error message and exception) of V1 GenericIndexed.
*
* @param index index identifying an element of an GenericIndexed.
*/
private void checkIndex(int index)
{
if (index < 0) {
throw new IAE("Index[%s] < 0", index);
}
if (index >= size) {
throw new IAE(String.format("Index[%s] >= size[%s]", index, size));
}
throw new IAE("Unknown version [%s]", versionFromBuffer);
}
@Override
public Class<? extends T> getClazz()
{
return bufferIndexed.getClazz();
return strategy.getClazz();
}
@Override
public int size()
{
return bufferIndexed.size();
return size;
}
@Override
public T get(int index)
{
return bufferIndexed.get(index);
return versionOne ? getVersionOne(index) : getVersionTwo(index);
}
/**
@ -407,35 +269,56 @@ public class GenericIndexed<T> implements Indexed<T>
@Override
public int indexOf(T value)
{
return bufferIndexed.indexOf(value);
return indexOf(this, value);
}
private int indexOf(Indexed<T> indexed, T value)
{
if (!allowReverseLookup) {
throw new UnsupportedOperationException("Reverse lookup not allowed.");
}
value = (value != null && value.equals("")) ? null : value;
int minIndex = 0;
int maxIndex = size - 1;
while (minIndex <= maxIndex) {
int currIndex = (minIndex + maxIndex) >>> 1;
T currValue = indexed.get(currIndex);
int comparison = strategy.compare(currValue, value);
if (comparison == 0) {
return currIndex;
}
if (comparison < 0) {
minIndex = currIndex + 1;
} else {
maxIndex = currIndex - 1;
}
}
return -(minIndex + 1);
}
@Override
public Iterator<T> iterator()
{
return bufferIndexed.iterator();
return IndexedIterable.create(this).iterator();
}
public long getSerializedSize()
{
if (valueBuffers.size() != 1) {
if (!versionOne) {
throw new UnsupportedOperationException("Method not supported for version 2 GenericIndexed.");
}
return theBuffer.remaining()
+ 2
+ Ints.BYTES
+ Ints.BYTES; //2 Bytes for version and sorted flag. 4 bytes to store numbers
// of bytes and next 4 bytes to store number of elements.
return getSerializedSizeVersionOne();
}
public void writeToChannel(WritableByteChannel channel) throws IOException
{
//version 2 will always have more than one buffer in valueBuffers.
if (valueBuffers.size() == 1) {
channel.write(ByteBuffer.wrap(new byte[]{VERSION_ONE, allowReverseLookup ? (byte) 0x1 : (byte) 0x0}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(theBuffer.remaining() + 4))); // 4 Bytes to store size.
channel.write(ByteBuffer.wrap(Ints.toByteArray(size)));
channel.write(theBuffer.asReadOnlyBuffer());
if (versionOne) {
writeToChannelVersionOne(channel);
} else {
throw new UnsupportedOperationException(
"GenericIndexed serialization for V2 is unsupported. Use GenericIndexedWriter instead.");
@ -449,64 +332,19 @@ public class GenericIndexed<T> implements Indexed<T>
*/
public GenericIndexed<T>.BufferIndexed singleThreaded()
{
if (valueBuffers.size() == 1) {
final ByteBuffer copyBuffer = valueBuffers.get(0).asReadOnlyBuffer();
return new BufferIndexed()
{
@Override
public T get(final int index)
{
checkIndex(index, size);
return versionOne ? singleThreadedVersionOne() : singleThreadedVersionTwo();
}
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = 4;
endOffset = headerBuffer.getInt(0);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + 4;
endOffset = headerBuffer.getInt(headerPosition + 4);
}
return _get(copyBuffer, startOffset, endOffset);
}
};
} else {
final List<ByteBuffer> copyValueBuffers = new ArrayList<>();
for (ByteBuffer buffer : valueBuffers) {
copyValueBuffers.add(buffer.asReadOnlyBuffer());
}
return new BufferIndexed()
{
@Override
public T get(final int index)
{
int fileNum = index >> logBaseTwoOfElementsPerValueFile;
final ByteBuffer copyBuffer = copyValueBuffers.get(fileNum);
checkIndex(index, size);
final int startOffset;
final int endOffset;
int relativePositionOfIndex = index & ((1 << logBaseTwoOfElementsPerValueFile) - 1);
if (relativePositionOfIndex == 0) {
int headerPosition = index * Ints.BYTES;
startOffset = 4;
endOffset = headerBuffer.getInt(headerPosition);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + 4;
endOffset = headerBuffer.getInt(headerPosition + 4);
}
return _get(copyBuffer, startOffset, endOffset);
}
};
private T copyBufferAndGet(ByteBuffer valueBuffer, int startOffset, int endOffset)
{
final int size = endOffset - startOffset;
if (size == 0) {
return null;
}
ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer();
copyValueBuffer.position(startOffset);
// fromByteBuffer must not modify the buffer limit
return strategy.fromByteBuffer(copyValueBuffer, size);
}
abstract class BufferIndexed implements Indexed<T>
@ -525,15 +363,19 @@ public class GenericIndexed<T> implements Indexed<T>
return size;
}
protected T _get(ByteBuffer copyValueBuffer, int startOffset, int endOffset)
T bufferedIndexedGet(ByteBuffer copyValueBuffer, int startOffset, int endOffset)
{
final int size = endOffset - startOffset;
if (startOffset == endOffset) {
lastReadSize = size;
if (size == 0) {
return null;
}
// ObjectStrategy.fromByteBuffer() is allowed to reset the limit of the buffer. So if the limit is changed,
// position() call in the next line could throw an exception, if the position is set beyond the new limit. clear()
// sets the limit to the maximum possible, the capacity. It is safe to reset the limit to capacity, because the
// value buffer(s) initial limit equals to capacity.
copyValueBuffer.clear();
copyValueBuffer.position(startOffset);
lastReadSize = size;
// fromByteBuffer must not modify the buffer limit
return strategy.fromByteBuffer(copyValueBuffer, size);
}
@ -542,7 +384,7 @@ public class GenericIndexed<T> implements Indexed<T>
*
* @return the size in bytes of the last value read
*/
public int getLastValueSize()
int getLastValueSize()
{
return lastReadSize;
}
@ -550,38 +392,247 @@ public class GenericIndexed<T> implements Indexed<T>
@Override
public int indexOf(T value)
{
if (!allowReverseLookup) {
throw new UnsupportedOperationException("Reverse lookup not allowed.");
}
value = (value != null && value.equals("")) ? null : value;
int minIndex = 0;
int maxIndex = size - 1;
while (minIndex <= maxIndex) {
int currIndex = (minIndex + maxIndex) >>> 1;
T currValue = get(currIndex);
int comparison = strategy.compare(currValue, value);
if (comparison == 0) {
return currIndex;
}
if (comparison < 0) {
minIndex = currIndex + 1;
} else {
maxIndex = currIndex - 1;
}
}
return -(minIndex + 1);
return GenericIndexed.this.indexOf(this, value);
}
@Override
public Iterator<T> iterator()
{
return IndexedIterable.create(this).iterator();
return GenericIndexed.this.iterator();
}
}
///////////////
// VERSION ONE
///////////////
private static <T> GenericIndexed<T> createGenericIndexedVersionOne(ByteBuffer byteBuffer, ObjectStrategy<T> strategy)
{
boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED;
int size = byteBuffer.getInt();
ByteBuffer bufferToUse = byteBuffer.asReadOnlyBuffer();
bufferToUse.limit(bufferToUse.position() + size);
byteBuffer.position(bufferToUse.limit());
return new GenericIndexed<>(
bufferToUse,
strategy,
allowReverseLookup
);
}
private static <T> GenericIndexed<T> fromIterableVersionOne(Iterable<T> objectsIterable, ObjectStrategy<T> strategy)
{
Iterator<T> objects = objectsIterable.iterator();
if (!objects.hasNext()) {
final ByteBuffer buffer = ByteBuffer.allocate(Ints.BYTES).putInt(0);
buffer.flip();
return new GenericIndexed<>(buffer, strategy, true);
}
boolean allowReverseLookup = true;
int count = 0;
ZeroCopyByteArrayOutputStream headerBytes = new ZeroCopyByteArrayOutputStream();
ZeroCopyByteArrayOutputStream valueBytes = new ZeroCopyByteArrayOutputStream();
ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES);
try {
int offset = 0;
T prevVal = null;
do {
count++;
T next = objects.next();
if (allowReverseLookup && prevVal != null && !(strategy.compare(prevVal, next) < 0)) {
allowReverseLookup = false;
}
final byte[] bytes = strategy.toBytes(next);
offset += Ints.BYTES + bytes.length;
SerializerUtils.writeBigEndianIntToOutputStream(headerBytes, offset, helperBuffer);
SerializerUtils.writeBigEndianIntToOutputStream(valueBytes, bytes.length, helperBuffer);
valueBytes.write(bytes);
if (prevVal instanceof Closeable) {
CloseQuietly.close((Closeable) prevVal);
}
prevVal = next;
} while (objects.hasNext());
if (prevVal instanceof Closeable) {
CloseQuietly.close((Closeable) prevVal);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
ByteBuffer theBuffer = ByteBuffer.allocate(Ints.BYTES + headerBytes.size() + valueBytes.size());
theBuffer.putInt(count);
headerBytes.writeTo(theBuffer);
valueBytes.writeTo(theBuffer);
theBuffer.flip();
return new GenericIndexed<>(theBuffer.asReadOnlyBuffer(), strategy, allowReverseLookup);
}
private long getSerializedSizeVersionOne()
{
return theBuffer.remaining()
+ 1 // version byte
+ 1 // allowReverseLookup flag
+ Ints.BYTES // numBytesUsed
+ Ints.BYTES; // numElements
}
private T getVersionOne(int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = Ints.BYTES;
endOffset = headerBuffer.getInt(0);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES);
}
return copyBufferAndGet(firstValueBuffer, startOffset, endOffset);
}
private BufferIndexed singleThreadedVersionOne()
{
final ByteBuffer copyBuffer = firstValueBuffer.asReadOnlyBuffer();
return new BufferIndexed()
{
@Override
public T get(final int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
if (index == 0) {
startOffset = 4;
endOffset = headerBuffer.getInt(0);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES);
}
return bufferedIndexedGet(copyBuffer, startOffset, endOffset);
}
};
}
private void writeToChannelVersionOne(WritableByteChannel channel) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[]{
VERSION_ONE,
allowReverseLookup ? REVERSE_LOOKUP_ALLOWED : REVERSE_LOOKUP_DISALLOWED
}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(theBuffer.remaining() + Ints.BYTES)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(size)));
channel.write(theBuffer.asReadOnlyBuffer());
}
///////////////
// VERSION TWO
///////////////
private static <T> GenericIndexed<T> createGenericIndexedVersionTwo(
ByteBuffer byteBuffer,
ObjectStrategy<T> strategy,
SmooshedFileMapper fileMapper
)
{
if (fileMapper == null) {
throw new IAE("SmooshedFileMapper can not be null for version 2.");
}
boolean allowReverseLookup = byteBuffer.get() == REVERSE_LOOKUP_ALLOWED;
int logBaseTwoOfElementsPerValueFile = byteBuffer.getInt();
int numElements = byteBuffer.getInt();
try {
String columnName = SERIALIZER_UTILS.readString(byteBuffer);
int elementsPerValueFile = 1 << logBaseTwoOfElementsPerValueFile;
int numberOfFilesRequired = getNumberOfFilesRequired(elementsPerValueFile, numElements);
ByteBuffer[] valueBuffersToUse = new ByteBuffer[numberOfFilesRequired];
for (int i = 0; i < numberOfFilesRequired; i++) {
// SmooshedFileMapper.mapFile() contract guarantees that the valueBuffer's limit equals to capacity.
ByteBuffer valueBuffer = fileMapper.mapFile(GenericIndexedWriter.generateValueFileName(columnName, i));
valueBuffersToUse[i] = valueBuffer.asReadOnlyBuffer();
}
ByteBuffer headerBuffer = fileMapper.mapFile(GenericIndexedWriter.generateHeaderFileName(columnName));
return new GenericIndexed<>(
valueBuffersToUse,
headerBuffer,
strategy,
allowReverseLookup,
logBaseTwoOfElementsPerValueFile,
numElements
);
}
catch (IOException e) {
throw new RuntimeException("File mapping failed.", e);
}
}
private T getVersionTwo(int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
int relativePositionOfIndex = index & relativeIndexMask;
if (relativePositionOfIndex == 0) {
int headerPosition = index * Ints.BYTES;
startOffset = Ints.BYTES;
endOffset = headerBuffer.getInt(headerPosition);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES);
}
int fileNum = index >> logBaseTwoOfElementsPerValueFile;
return copyBufferAndGet(valueBuffers[fileNum], startOffset, endOffset);
}
private BufferIndexed singleThreadedVersionTwo()
{
final ByteBuffer[] copyValueBuffers = new ByteBuffer[valueBuffers.length];
for (int i = 0; i < valueBuffers.length; i++) {
copyValueBuffers[i] = valueBuffers[i].asReadOnlyBuffer();
}
return new BufferIndexed()
{
@Override
public T get(final int index)
{
checkIndex(index);
final int startOffset;
final int endOffset;
int relativePositionOfIndex = index & relativeIndexMask;
if (relativePositionOfIndex == 0) {
int headerPosition = index * Ints.BYTES;
startOffset = 4;
endOffset = headerBuffer.getInt(headerPosition);
} else {
int headerPosition = (index - 1) * Ints.BYTES;
startOffset = headerBuffer.getInt(headerPosition) + Ints.BYTES;
endOffset = headerBuffer.getInt(headerPosition + Ints.BYTES);
}
int fileNum = index >> logBaseTwoOfElementsPerValueFile;
return bufferedIndexedGet(copyValueBuffers[fileNum], startOffset, endOffset);
}
};
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.io.CountingOutputStream;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
@ -66,6 +67,7 @@ public class GenericIndexedWriter<T> implements Closeable
private long numWritten = 0;
private boolean requireMultipleFiles = false;
private ByteBuffer buf;
private final ByteBuffer sizeHelperBuffer = ByteBuffer.allocate(Ints.BYTES);
public GenericIndexedWriter(
@ -122,20 +124,6 @@ public class GenericIndexedWriter<T> implements Closeable
}
}
private static void writeLongValueToOutputStream(ByteBuffer helperBuf, long value, CountingOutputStream outLong)
throws IOException
{
helperBuf.putLong(0, value);
outLong.write(helperBuf.array());
}
private static void writeIntValueToOutputStream(ByteBuffer helperBuf, int value, CountingOutputStream outLong)
throws IOException
{
helperBuf.putInt(0, value);
outLong.write(helperBuf.array());
}
public void open() throws IOException
{
headerOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("header")));
@ -151,13 +139,13 @@ public class GenericIndexedWriter<T> implements Closeable
byte[] bytesToWrite = strategy.toBytes(objectToWrite);
++numWritten;
valuesOut.write(Ints.toByteArray(bytesToWrite.length));
SerializerUtils.writeBigEndianIntToOutputStream(valuesOut, bytesToWrite.length, sizeHelperBuffer);
valuesOut.write(bytesToWrite);
if (!requireMultipleFiles) {
writeIntValueToOutputStream(buf, Ints.checkedCast(valuesOut.getCount()), headerOut);
SerializerUtils.writeBigEndianIntToOutputStream(headerOut, Ints.checkedCast(valuesOut.getCount()), buf);
} else {
writeLongValueToOutputStream(buf, valuesOut.getCount(), headerOutLong);
SerializerUtils.writeNativeOrderedLongToOutputStream(headerOutLong, valuesOut.getCount(), buf);
}
if (!requireMultipleFiles && getSerializedSize() > fileSizeLimit) {
@ -205,7 +193,7 @@ public class GenericIndexedWriter<T> implements Closeable
try (OutputStream metaOut = ioPeon.makeOutputStream(makeFilename("meta"))) {
metaOut.write(GenericIndexed.VERSION_ONE);
metaOut.write(objectsSorted ? 0x1 : 0x0);
metaOut.write(objectsSorted ? GenericIndexed.REVERSE_LOOKUP_ALLOWED : GenericIndexed.REVERSE_LOOKUP_DISALLOWED);
metaOut.write(Ints.toByteArray(Ints.checkedCast(numBytesWritten + 4)));
metaOut.write(Ints.toByteArray(Ints.checkedCast(numWritten)));
}
@ -366,7 +354,7 @@ public class GenericIndexedWriter<T> implements Closeable
int bagSizePower = bagSizePower();
OutputStream metaOut = Channels.newOutputStream(channel);
metaOut.write(GenericIndexed.VERSION_TWO);
metaOut.write(objectsSorted ? 0x1 : 0x0);
metaOut.write(objectsSorted ? GenericIndexed.REVERSE_LOOKUP_ALLOWED : GenericIndexed.REVERSE_LOOKUP_DISALLOWED);
metaOut.write(Ints.toByteArray(bagSizePower));
metaOut.write(Ints.toByteArray(Ints.checkedCast(numWritten)));
metaOut.write(Ints.toByteArray(fileNameByteArray.length));
@ -436,7 +424,11 @@ public class GenericIndexedWriter<T> implements Closeable
}
currentNumBytes = Long.reverseBytes(headerFile.readLong());
relativeNumBytes = currentNumBytes - relativeRefBytes;
writeIntValueToOutputStream(helperBuffer, Ints.checkedCast(relativeNumBytes), finalHeaderOut);
SerializerUtils.writeNativeOrderedIntToOutputStream(
finalHeaderOut,
Ints.checkedCast(relativeNumBytes),
helperBuffer
);
}
long numBytesToPutInFile = finalHeaderOut.getCount();
@ -460,7 +452,7 @@ public class GenericIndexedWriter<T> implements Closeable
ByteBuffer buf = ByteBuffer.allocate(Longs.BYTES).order(ByteOrder.nativeOrder());
for (int i = 0; i < numWritten; i++) {
int count = headerFile.readInt();
writeLongValueToOutputStream(buf, count, headerOutLong);
SerializerUtils.writeNativeOrderedLongToOutputStream(headerOutLong, count, buf);
}
}
}

View File

@ -86,9 +86,8 @@ public class IndexedRTree implements Comparable<IndexedRTree>
@Override
public ImmutableRTree fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
return new ImmutableRTree(readOnlyBuffer, bitmapFactory);
buffer.limit(buffer.position() + numBytes);
return new ImmutableRTree(buffer, bitmapFactory);
}
@Override

View File

@ -1,138 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.data;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import io.druid.collections.IntList;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
/**
*/
public class IntBufferIndexedInts implements IndexedInts, Comparable<IntBufferIndexedInts>
{
public static ObjectStrategy<IntBufferIndexedInts> objectStrategy =
new IntBufferIndexedIntsObjectStrategy();
public static IntBufferIndexedInts fromArray(int[] array)
{
final ByteBuffer buffer = ByteBuffer.allocate(array.length * Ints.BYTES);
buffer.asIntBuffer().put(array);
return new IntBufferIndexedInts(buffer.asReadOnlyBuffer());
}
public static IntBufferIndexedInts fromIntList(IntList intList)
{
final ByteBuffer buffer = ByteBuffer.allocate(intList.length() * Ints.BYTES);
final IntBuffer intBuf = buffer.asIntBuffer();
for (int i = 0; i < intList.length(); ++i) {
intBuf.put(intList.get(i));
}
return new IntBufferIndexedInts(buffer.asReadOnlyBuffer());
}
private final ByteBuffer buffer;
public IntBufferIndexedInts(ByteBuffer buffer)
{
this.buffer = buffer;
}
@Override
public int size()
{
return buffer.remaining() / 4;
}
@Override
public int get(int index)
{
return buffer.getInt(buffer.position() + (index * 4));
}
public ByteBuffer getBuffer()
{
return buffer.asReadOnlyBuffer();
}
@Override
public int compareTo(IntBufferIndexedInts o)
{
return buffer.compareTo(o.getBuffer());
}
@Override
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}
private static class IntBufferIndexedIntsObjectStrategy implements ObjectStrategy<IntBufferIndexedInts>
{
@Override
public Class<? extends IntBufferIndexedInts> getClazz()
{
return IntBufferIndexedInts.class;
}
@Override
public IntBufferIndexedInts fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
return new IntBufferIndexedInts(readOnlyBuffer);
}
@Override
public byte[] toBytes(IntBufferIndexedInts val)
{
ByteBuffer buffer = val.getBuffer();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
}
@Override
public int compare(IntBufferIndexedInts o1, IntBufferIndexedInts o2)
{
return Ordering.natural().nullsFirst().compare(o1, o2);
}
}
@Override
public void fill(int index, int[] toFill)
{
throw new UnsupportedOperationException("fill not supported");
}
@Override
public void close() throws IOException
{
}
}

View File

@ -19,20 +19,27 @@
package io.druid.segment.data;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.io.ByteSink;
import com.google.common.io.CountingOutputStream;
import com.google.common.math.LongMath;
import com.google.common.primitives.Longs;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.io.smoosh.FileSmoosher;
import it.unimi.dsi.fastutil.longs.Long2IntMap;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
/**
* Unsafe for concurrent use from multiple threads.
*/
public class IntermediateLongSupplierSerializer implements LongSupplierSerializer
{
@ -42,10 +49,13 @@ public class IntermediateLongSupplierSerializer implements LongSupplierSerialize
private final ByteOrder order;
private final CompressedObjectStrategy.CompressionStrategy compression;
private CountingOutputStream tempOut = null;
private final ByteBuffer helperBuffer = ByteBuffer.allocate(Longs.BYTES);
private int numInserted = 0;
private BiMap<Long, Integer> uniqueValues = HashBiMap.create();
private final Long2IntMap uniqueValues = new Long2IntOpenHashMap();
private final LongList valuesAddedInOrder = new LongArrayList();
private long maxVal = Long.MIN_VALUE;
private long minVal = Long.MAX_VALUE;
@ -77,10 +87,11 @@ public class IntermediateLongSupplierSerializer implements LongSupplierSerialize
public void add(long value) throws IOException
{
tempOut.write(Longs.toByteArray(value));
SerializerUtils.writeBigEndianLongToOutputStream(tempOut, value, helperBuffer);
++numInserted;
if (uniqueValues.size() <= CompressionFactory.MAX_TABLE_SIZE && !uniqueValues.containsKey(value)) {
uniqueValues.put(value, uniqueValues.size());
valuesAddedInOrder.add(value);
}
if (value > maxVal) {
maxVal = value;
@ -101,7 +112,7 @@ public class IntermediateLongSupplierSerializer implements LongSupplierSerialize
delta = -1;
}
if (uniqueValues.size() <= CompressionFactory.MAX_TABLE_SIZE) {
writer = new TableLongEncodingWriter(uniqueValues);
writer = new TableLongEncodingWriter(uniqueValues, valuesAddedInOrder);
} else if (delta != -1 && delta != Long.MAX_VALUE) {
writer = new DeltaLongEncodingWriter(minVal, delta);
} else {

View File

@ -29,8 +29,11 @@ public interface ObjectStrategy<T> extends Comparator<T>
/**
* Convert values from their underlying byte representation.
*
* Implementations of this method must not change the given buffer mark, or limit, but may modify its position.
* Use buffer.asReadOnlyBuffer() or buffer.duplicate() if mark or limit need to be set.
* Implementations of this method <i>may</i> change the given buffer's mark, or limit, and position.
*
* Implementations of this method <i>may not</i> store the given buffer in a field of the "deserialized" object,
* need to use {@link ByteBuffer#slice()}, {@link ByteBuffer#asReadOnlyBuffer()} or {@link ByteBuffer#duplicate()} in
* this case.
*
* @param buffer buffer to read value from
* @param numBytes number of bytes used to store the value, starting at buffer.position()

View File

@ -102,9 +102,8 @@ public class RoaringBitmapSerdeFactory implements BitmapSerdeFactory
@Override
public ImmutableBitmap fromByteBuffer(ByteBuffer buffer, int numBytes)
{
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
readOnlyBuffer.limit(readOnlyBuffer.position() + numBytes);
return new WrappedImmutableRoaringBitmap(new ImmutableRoaringBitmap(readOnlyBuffer));
buffer.limit(buffer.position() + numBytes);
return new WrappedImmutableRoaringBitmap(new ImmutableRoaringBitmap(buffer));
}
@Override

View File

@ -19,10 +19,12 @@
package io.druid.segment.data;
import com.google.common.collect.BiMap;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import io.druid.common.utils.SerializerUtils;
import io.druid.java.util.common.IAE;
import it.unimi.dsi.fastutil.longs.Long2IntMap;
import it.unimi.dsi.fastutil.longs.LongList;
import java.io.IOException;
import java.io.OutputStream;
@ -31,16 +33,18 @@ import java.nio.ByteBuffer;
public class TableLongEncodingWriter implements CompressionFactory.LongEncodingWriter
{
private final BiMap<Long, Integer> table;
private final Long2IntMap table;
private final LongList valueAddedInOrder;
private final int bitsPerValue;
private VSizeLongSerde.LongSerializer serializer;
public TableLongEncodingWriter(BiMap<Long, Integer> table)
public TableLongEncodingWriter(Long2IntMap table, LongList valueAddedInOrder)
{
if (table.size() > CompressionFactory.MAX_TABLE_SIZE) {
throw new IAE("Invalid table size[%s]", table.size());
}
this.table = table;
this.valueAddedInOrder = valueAddedInOrder;
this.bitsPerValue = VSizeLongSerde.getBitsForMax(table.size());
}
@ -77,9 +81,9 @@ public class TableLongEncodingWriter implements CompressionFactory.LongEncodingW
metaOut.write(CompressionFactory.LongEncodingFormat.TABLE.getId());
metaOut.write(CompressionFactory.TABLE_ENCODING_VERSION);
metaOut.write(Ints.toByteArray(table.size()));
BiMap<Integer, Long> inverse = table.inverse();
for (int i = 0; i < table.size(); i++) {
metaOut.write(Longs.toByteArray(inverse.get(i)));
ByteBuffer helperBuffer = ByteBuffer.allocate(Longs.BYTES);
for (int i = 0; i < valueAddedInOrder.size(); i++) {
SerializerUtils.writeBigEndianLongToOutputStream(metaOut, valueAddedInOrder.getLong(i), helperBuffer);
}
}

View File

@ -20,10 +20,11 @@
package io.druid.segment.data;
import com.google.common.primitives.Ints;
import io.druid.common.utils.SerializerUtils;
import io.druid.io.ZeroCopyByteArrayOutputStream;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
@ -55,12 +56,12 @@ public class VSizeIndexed implements IndexedMultivalue<IndexedInts>
++count;
}
ByteArrayOutputStream headerBytes = new ByteArrayOutputStream(4 + (count * 4));
ByteArrayOutputStream valueBytes = new ByteArrayOutputStream();
ZeroCopyByteArrayOutputStream headerBytes = new ZeroCopyByteArrayOutputStream(4 + (count * 4));
ZeroCopyByteArrayOutputStream valueBytes = new ZeroCopyByteArrayOutputStream();
ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES);
int offset = 0;
try {
headerBytes.write(Ints.toByteArray(count));
SerializerUtils.writeBigEndianIntToOutputStream(headerBytes, count, helperBuffer);
for (VSizeIndexedInts object : objectsIterable) {
if (object.getNumBytes() != numBytes) {
@ -68,7 +69,7 @@ public class VSizeIndexed implements IndexedMultivalue<IndexedInts>
}
byte[] bytes = object.getBytesNoPadding();
offset += bytes.length;
headerBytes.write(Ints.toByteArray(offset));
SerializerUtils.writeBigEndianIntToOutputStream(headerBytes, offset, helperBuffer);
valueBytes.write(bytes);
}
valueBytes.write(new byte[4 - numBytes]);
@ -78,8 +79,8 @@ public class VSizeIndexed implements IndexedMultivalue<IndexedInts>
}
ByteBuffer theBuffer = ByteBuffer.allocate(headerBytes.size() + valueBytes.size());
theBuffer.put(headerBytes.toByteArray());
theBuffer.put(valueBytes.toByteArray());
headerBytes.writeTo(theBuffer);
valueBytes.writeTo(theBuffer);
theBuffer.flip();
return new VSizeIndexed(theBuffer.asReadOnlyBuffer(), numBytes);

View File

@ -53,7 +53,7 @@ public class VSizeIndexedInts implements IndexedInts, Comparable<VSizeIndexedInt
/**
* provide for performance reason.
*/
public static byte[] getBytesNoPaddingfromList(List<Integer> list, int maxValue)
public static byte[] getBytesNoPaddingFromList(List<Integer> list, int maxValue)
{
int numBytes = getNumBytesForMax(maxValue);
@ -76,6 +76,7 @@ public class VSizeIndexedInts implements IndexedInts, Comparable<VSizeIndexedInt
private static void writeToBuffer(ByteBuffer buffer, List<Integer> list, int numBytes, int maxValue)
{
int i = 0;
ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES);
for (Integer val : list) {
if (val < 0) {
throw new IAE("integer values must be positive, got[%d], i[%d]", val, i);
@ -84,8 +85,8 @@ public class VSizeIndexedInts implements IndexedInts, Comparable<VSizeIndexedInt
throw new IAE("val[%d] > maxValue[%d], please don't lie about maxValue. i[%d]", val, maxValue, i);
}
byte[] intAsBytes = Ints.toByteArray(val);
buffer.put(intAsBytes, intAsBytes.length - numBytes, numBytes);
helperBuffer.putInt(0, val);
buffer.put(helperBuffer.array(), Ints.BYTES - numBytes, numBytes);
++i;
}
buffer.position(0);

View File

@ -42,6 +42,7 @@ public class VSizeIndexedIntsWriter extends SingleValueIndexedIntsWriter
private final int numBytes;
private CountingOutputStream valuesOut = null;
private final ByteBuffer helperBuffer = ByteBuffer.allocate(Ints.BYTES);
public VSizeIndexedIntsWriter(
final IOPeon ioPeon,
@ -63,8 +64,8 @@ public class VSizeIndexedIntsWriter extends SingleValueIndexedIntsWriter
@Override
protected void addValue(int val) throws IOException
{
byte[] intAsBytes = Ints.toByteArray(val);
valuesOut.write(intAsBytes, intAsBytes.length - numBytes, numBytes);
helperBuffer.putInt(0, val);
valuesOut.write(helperBuffer.array(), Ints.BYTES - numBytes, numBytes);
}
@Override

View File

@ -84,7 +84,7 @@ public class VSizeIndexedWriter extends MultiValueIndexedIntsWriter implements C
public void write(List<Integer> ints) throws IOException
{
byte[] bytesToWrite = ints == null ? EMPTY_ARRAY : VSizeIndexedInts.getBytesNoPaddingfromList(ints, maxId);
byte[] bytesToWrite = ints == null ? EMPTY_ARRAY : VSizeIndexedInts.getBytesNoPaddingFromList(ints, maxId);
valuesOut.write(bytesToWrite);

View File

@ -42,8 +42,7 @@ public class IndexedIntsTest
{
return Arrays.asList(
new Object[][]{
{VSizeIndexedInts.fromArray(array)},
{IntBufferIndexedInts.fromArray(array)}
{VSizeIndexedInts.fromArray(array)}
}
);
}

View File

@ -73,7 +73,7 @@ public class VSizeIndexedIntsTest
int maxValue = Ints.max(array);
VSizeIndexedInts ints = VSizeIndexedInts.fromList(list, maxValue);
byte[] bytes1 = ints.getBytesNoPadding();
byte[] bytes2 = VSizeIndexedInts.getBytesNoPaddingfromList(list, maxValue);
byte[] bytes2 = VSizeIndexedInts.getBytesNoPaddingFromList(list, maxValue);
Assert.assertArrayEquals(bytes1, bytes2);
}
}