mirror of https://github.com/apache/druid.git
add compressed variable-size ints column type
This commit is contained in:
parent
ce928d9636
commit
d20128b89b
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.data;
|
||||
|
||||
import com.google.common.collect.Ordering;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
|
||||
public class CompressedByteBufferObjectStrategy extends FixedSizeCompressedObjectStrategy<ByteBuffer>
|
||||
{
|
||||
public static final Ordering<Comparable> ORDERING = Ordering.natural().nullsFirst();
|
||||
|
||||
public static CompressedByteBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
|
||||
{
|
||||
return new CompressedByteBufferObjectStrategy(order, compression, sizePer);
|
||||
}
|
||||
|
||||
public CompressedByteBufferObjectStrategy(
|
||||
ByteOrder order,
|
||||
CompressionStrategy compression,
|
||||
final int sizePer
|
||||
)
|
||||
{
|
||||
super(
|
||||
order, new BufferConverter<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer convert(ByteBuffer buf)
|
||||
{
|
||||
return buf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(ByteBuffer lhs, ByteBuffer rhs)
|
||||
{
|
||||
return ORDERING.compare(lhs, rhs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sizeOf(int count)
|
||||
{
|
||||
return count; // 1 byte per element
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer combine(ByteBuffer into, ByteBuffer from)
|
||||
{
|
||||
return into.put(from);
|
||||
}
|
||||
}, compression, sizePer
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,395 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.data;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Shorts;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import io.druid.collections.ResourceHolder;
|
||||
import io.druid.collections.StupidResourceHolder;
|
||||
import io.druid.segment.CompressedPools;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.IntBuffer;
|
||||
import java.nio.ShortBuffer;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<IndexedInts>
|
||||
{
|
||||
public static final byte version = 0x2;
|
||||
|
||||
private final int totalSize;
|
||||
private final int sizePer;
|
||||
private final int numBytes;
|
||||
private final int bigEndianShift;
|
||||
private final int littleEndianMask;
|
||||
private final GenericIndexed<ResourceHolder<ByteBuffer>> baseBuffers;
|
||||
private final CompressedObjectStrategy.CompressionStrategy compression;
|
||||
|
||||
CompressedVSizeIntsIndexedSupplier(
|
||||
int totalSize,
|
||||
int sizePer,
|
||||
int numBytes,
|
||||
GenericIndexed<ResourceHolder<ByteBuffer>> baseBuffers,
|
||||
CompressedObjectStrategy.CompressionStrategy compression
|
||||
)
|
||||
{
|
||||
Preconditions.checkArgument(
|
||||
sizePer == (1 << Integer.numberOfTrailingZeros(sizePer)),
|
||||
"Number of entries per chunk must be a power of 2"
|
||||
);
|
||||
|
||||
this.totalSize = totalSize;
|
||||
this.sizePer = sizePer;
|
||||
this.baseBuffers = baseBuffers;
|
||||
this.compression = compression;
|
||||
this.numBytes = numBytes;
|
||||
this.bigEndianShift = Integer.SIZE - (numBytes << 3); // numBytes * 8
|
||||
this.littleEndianMask = (int)((1L << (numBytes << 3)) - 1); // set numBytes * 8 lower bits to 1
|
||||
}
|
||||
|
||||
public static int maxIntsInBufferForBytes(int numBytes)
|
||||
{
|
||||
int maxSizePer = (CompressedPools.BUFFER_SIZE - bufferPadding(numBytes)) / numBytes;
|
||||
// round down to the nearest power of 2
|
||||
return 1 << (Integer.SIZE - 1 - Integer.numberOfLeadingZeros(maxSizePer));
|
||||
}
|
||||
|
||||
private static int bufferPadding(int numBytes)
|
||||
{
|
||||
// when numBytes == 3 we need to pad the buffer to allow reading an extra byte
|
||||
// beyond the end of the last value, since we use buffer.getInt() to read values.
|
||||
// for numBytes 1, 2 we remove the need for padding by reading bytes or shorts directly.
|
||||
switch (numBytes) {
|
||||
case Shorts.BYTES:
|
||||
case 1:
|
||||
return 0;
|
||||
default:
|
||||
return Ints.BYTES - numBytes;
|
||||
}
|
||||
}
|
||||
|
||||
public static int maxIntsInBufferForValue(int maxValue)
|
||||
{
|
||||
return maxIntsInBufferForBytes(VSizeIndexedInts.getNumBytesForMax(maxValue));
|
||||
}
|
||||
|
||||
public int size()
|
||||
{
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexedInts get()
|
||||
{
|
||||
// optimized versions for int, short, and byte columns
|
||||
if(numBytes == Ints.BYTES) {
|
||||
return new CompressedFullSizeIndexedInts();
|
||||
} else if (numBytes == Shorts.BYTES) {
|
||||
return new CompressedShortSizeIndexedInts();
|
||||
} else if (numBytes == 1) {
|
||||
return new CompressedByteSizeIndexedInts();
|
||||
} else {
|
||||
// default version of everything else, i.e. 3-bytes per value
|
||||
return new CompressedVSizeIndexedInts();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public long getSerializedSize()
|
||||
{
|
||||
return 1 + // version
|
||||
1 + // numBytes
|
||||
Ints.BYTES + // totalSize
|
||||
Ints.BYTES + // sizePer
|
||||
1 + // compression id
|
||||
baseBuffers.getSerializedSize(); // data
|
||||
}
|
||||
|
||||
public void writeToChannel(WritableByteChannel channel) throws IOException
|
||||
{
|
||||
channel.write(ByteBuffer.wrap(new byte[]{version, (byte) numBytes}));
|
||||
channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize)));
|
||||
channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer)));
|
||||
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
|
||||
baseBuffers.writeToChannel(channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing. Do not use unless you like things breaking
|
||||
*/
|
||||
GenericIndexed<ResourceHolder<ByteBuffer>> getBaseBuffers()
|
||||
{
|
||||
return baseBuffers;
|
||||
}
|
||||
|
||||
public static CompressedVSizeIntsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
|
||||
{
|
||||
byte versionFromBuffer = buffer.get();
|
||||
|
||||
if (versionFromBuffer == version) {
|
||||
final int numBytes = buffer.get();
|
||||
final int totalSize = buffer.getInt();
|
||||
final int sizePer = buffer.getInt();
|
||||
final int chunkBytes = sizePer * numBytes + bufferPadding(numBytes);
|
||||
|
||||
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get());
|
||||
|
||||
return new CompressedVSizeIntsIndexedSupplier(
|
||||
totalSize,
|
||||
sizePer,
|
||||
numBytes,
|
||||
GenericIndexed.read(
|
||||
buffer,
|
||||
CompressedByteBufferObjectStrategy.getBufferForOrder(order, compression, chunkBytes)
|
||||
),
|
||||
compression
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
throw new IAE("Unknown version[%s]", versionFromBuffer);
|
||||
}
|
||||
|
||||
public static CompressedVSizeIntsIndexedSupplier fromList(
|
||||
final List<Integer> list, final int maxValue, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression
|
||||
)
|
||||
{
|
||||
final int numBytes = VSizeIndexedInts.getNumBytesForMax(maxValue);
|
||||
final int chunkBytes = chunkFactor * numBytes + bufferPadding(numBytes);
|
||||
|
||||
Preconditions.checkArgument(
|
||||
chunkFactor <= maxIntsInBufferForBytes(numBytes),
|
||||
"Chunks must be <= 64k bytes. chunkFactor was[%s]",
|
||||
chunkFactor
|
||||
);
|
||||
|
||||
return new CompressedVSizeIntsIndexedSupplier(
|
||||
list.size(),
|
||||
chunkFactor,
|
||||
numBytes,
|
||||
GenericIndexed.fromIterable(
|
||||
new Iterable<ResourceHolder<ByteBuffer>>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<ResourceHolder<ByteBuffer>> iterator()
|
||||
{
|
||||
return new Iterator<ResourceHolder<ByteBuffer>>()
|
||||
{
|
||||
int position = 0;
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return position < list.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceHolder<ByteBuffer> next()
|
||||
{
|
||||
ByteBuffer retVal = ByteBuffer
|
||||
.allocate(chunkBytes)
|
||||
.order(byteOrder);
|
||||
|
||||
if (chunkFactor > list.size() - position) {
|
||||
retVal.limit((list.size() - position) * numBytes);
|
||||
} else {
|
||||
retVal.limit(chunkFactor * numBytes);
|
||||
}
|
||||
|
||||
final List<Integer> ints = list.subList(position, position + retVal.remaining() / numBytes);
|
||||
final ByteBuffer buf = ByteBuffer
|
||||
.allocate(Ints.BYTES)
|
||||
.order(byteOrder);
|
||||
final boolean bigEndian = byteOrder.equals(ByteOrder.BIG_ENDIAN);
|
||||
for (int value : ints) {
|
||||
buf.putInt(0, value);
|
||||
if (bigEndian) {
|
||||
retVal.put(buf.array(), Ints.BYTES - numBytes, numBytes);
|
||||
} else {
|
||||
retVal.put(buf.array(), 0, numBytes);
|
||||
}
|
||||
}
|
||||
retVal.rewind();
|
||||
position += retVal.remaining() / numBytes;
|
||||
|
||||
return StupidResourceHolder.create(retVal);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
CompressedByteBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkBytes)
|
||||
),
|
||||
compression
|
||||
);
|
||||
}
|
||||
|
||||
private class CompressedFullSizeIndexedInts extends CompressedVSizeIndexedInts {
|
||||
IntBuffer intBuffer;
|
||||
|
||||
@Override
|
||||
protected void loadBuffer(int bufferNum)
|
||||
{
|
||||
super.loadBuffer(bufferNum);
|
||||
intBuffer = buffer.asIntBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int _get(int index)
|
||||
{
|
||||
return intBuffer.get(intBuffer.position() + index);
|
||||
}
|
||||
}
|
||||
|
||||
private class CompressedShortSizeIndexedInts extends CompressedVSizeIndexedInts {
|
||||
ShortBuffer shortBuffer;
|
||||
|
||||
@Override
|
||||
protected void loadBuffer(int bufferNum)
|
||||
{
|
||||
super.loadBuffer(bufferNum);
|
||||
shortBuffer = buffer.asShortBuffer();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int _get(int index)
|
||||
{
|
||||
// removes the need for padding
|
||||
return shortBuffer.get(shortBuffer.position() + index) & 0xFFFF;
|
||||
}
|
||||
}
|
||||
|
||||
private class CompressedByteSizeIndexedInts extends CompressedVSizeIndexedInts {
|
||||
@Override
|
||||
protected int _get(int index)
|
||||
{
|
||||
// removes the need for padding
|
||||
return buffer.get(buffer.position() + index) & 0xFF;
|
||||
}
|
||||
}
|
||||
|
||||
private class CompressedVSizeIndexedInts implements IndexedInts
|
||||
{
|
||||
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedBuffers = baseBuffers.singleThreaded();
|
||||
|
||||
final int div = Integer.numberOfTrailingZeros(sizePer);
|
||||
final int rem = sizePer - 1;
|
||||
|
||||
int currIndex = -1;
|
||||
ResourceHolder<ByteBuffer> holder;
|
||||
ByteBuffer buffer;
|
||||
boolean bigEndian;
|
||||
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value at the given index into the column.
|
||||
*
|
||||
* Assumes the number of entries in each decompression buffers is a power of two.
|
||||
*
|
||||
* @param index index of the value in the column
|
||||
* @return the value at the given index
|
||||
*/
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
// assumes the number of entries in each buffer is a power of 2
|
||||
final int bufferNum = index >> div;
|
||||
|
||||
if (bufferNum != currIndex) {
|
||||
loadBuffer(bufferNum);
|
||||
}
|
||||
|
||||
return _get(index & rem);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value at the given index in the current decompression buffer
|
||||
*
|
||||
* @param index index of the value in the curent buffer
|
||||
* @return the value at the given index
|
||||
*/
|
||||
protected int _get(final int index)
|
||||
{
|
||||
final int pos = buffer.position() + index * numBytes;
|
||||
// example for numBytes = 3
|
||||
// big-endian: 0x000c0b0a stored 0c 0b 0a XX, read 0x0c0b0aXX >>> 8
|
||||
// little-endian: 0x000c0b0a stored 0a 0b 0c XX, read 0xXX0c0b0a & 0x00FFFFFF
|
||||
return bigEndian ?
|
||||
buffer.getInt(pos) >>> bigEndianShift :
|
||||
buffer.getInt(pos) & littleEndianMask;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return new IndexedIntsIterator(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int index, int[] toFill)
|
||||
{
|
||||
throw new UnsupportedOperationException("fill not supported");
|
||||
}
|
||||
|
||||
protected void loadBuffer(int bufferNum)
|
||||
{
|
||||
CloseQuietly.close(holder);
|
||||
holder = singleThreadedBuffers.get(bufferNum);
|
||||
buffer = holder.get();
|
||||
currIndex = bufferNum;
|
||||
bigEndian = buffer.order().equals(ByteOrder.BIG_ENDIAN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "CompressedVSizedIntsIndexedSupplier{" +
|
||||
"currIndex=" + currIndex +
|
||||
", sizePer=" + sizePer +
|
||||
", numChunks=" + singleThreadedBuffers.size() +
|
||||
", totalSize=" + totalSize +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
Closeables.close(holder, false);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,371 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.data;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import io.druid.segment.CompressedPools;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.channels.Channels;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class CompressedVSizeIntsIndexedSupplierTest extends CompressionStrategyTest
|
||||
{
|
||||
@Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}")
|
||||
public static Iterable<Object[]> compressionStrategies()
|
||||
{
|
||||
final Iterable<CompressedObjectStrategy.CompressionStrategy> compressionStrategies = Iterables.transform(
|
||||
CompressionStrategyTest.compressionStrategies(),
|
||||
new Function<Object[], CompressedObjectStrategy.CompressionStrategy>()
|
||||
{
|
||||
@Override
|
||||
public CompressedObjectStrategy.CompressionStrategy apply(Object[] input)
|
||||
{
|
||||
return (CompressedObjectStrategy.CompressionStrategy) input[0];
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
Set<List<Object>> combinations = Sets.cartesianProduct(
|
||||
Sets.newHashSet(compressionStrategies),
|
||||
Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN)
|
||||
);
|
||||
|
||||
return Iterables.transform(
|
||||
combinations, new Function<List, Object[]>()
|
||||
{
|
||||
@Override
|
||||
public Object[] apply(List input)
|
||||
{
|
||||
return new Object[]{input.get(0), input.get(1)};
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private static final int[] MAX_VALUES = new int[] { 0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF };
|
||||
|
||||
public CompressedVSizeIntsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy, ByteOrder byteOrder)
|
||||
{
|
||||
super(compressionStrategy);
|
||||
this.byteOrder = byteOrder;
|
||||
}
|
||||
|
||||
private IndexedInts indexed;
|
||||
private CompressedVSizeIntsIndexedSupplier supplier;
|
||||
private int[] vals;
|
||||
private final ByteOrder byteOrder;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
CloseQuietly.close(indexed);
|
||||
indexed = null;
|
||||
supplier = null;
|
||||
vals = null;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception
|
||||
{
|
||||
CloseQuietly.close(indexed);
|
||||
}
|
||||
|
||||
private void setupSimple(final int chunkSize)
|
||||
{
|
||||
CloseQuietly.close(indexed);
|
||||
|
||||
vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
|
||||
|
||||
supplier = CompressedVSizeIntsIndexedSupplier.fromList(
|
||||
Ints.asList(vals),
|
||||
Ints.max(vals),
|
||||
chunkSize,
|
||||
ByteOrder.nativeOrder(),
|
||||
compressionStrategy
|
||||
);
|
||||
|
||||
indexed = supplier.get();
|
||||
}
|
||||
|
||||
private void setupSimpleWithSerde(final int chunkSize) throws IOException
|
||||
{
|
||||
vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
|
||||
|
||||
makeWithSerde(chunkSize);
|
||||
}
|
||||
|
||||
private void makeWithSerde(final int chunkSize) throws IOException
|
||||
{
|
||||
CloseQuietly.close(indexed);
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
final CompressedVSizeIntsIndexedSupplier theSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
|
||||
Ints.asList(vals), Ints.max(vals), chunkSize, byteOrder, compressionStrategy
|
||||
);
|
||||
theSupplier.writeToChannel(Channels.newChannel(baos));
|
||||
|
||||
final byte[] bytes = baos.toByteArray();
|
||||
Assert.assertEquals(theSupplier.getSerializedSize(), bytes.length);
|
||||
|
||||
supplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), byteOrder);
|
||||
indexed = supplier.get();
|
||||
}
|
||||
|
||||
private void setupLargeChunks(final int chunkSize, final int totalSize, final int maxValue) throws IOException
|
||||
{
|
||||
vals = new int[totalSize];
|
||||
Random rand = new Random(0);
|
||||
for(int i = 0; i < vals.length; ++i) {
|
||||
// VSizeIndexed only allows positive values
|
||||
vals[i] = rand.nextInt(maxValue);
|
||||
}
|
||||
|
||||
makeWithSerde(chunkSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSanity() throws Exception
|
||||
{
|
||||
setupSimple(2);
|
||||
Assert.assertEquals(8, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
|
||||
setupSimple(4);
|
||||
Assert.assertEquals(4, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
|
||||
setupSimple(32);
|
||||
Assert.assertEquals(1, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargeChunks() throws Exception
|
||||
{
|
||||
for (int maxValue : MAX_VALUES) {
|
||||
final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue);
|
||||
|
||||
setupLargeChunks(maxChunkSize, 10 * maxChunkSize, maxValue);
|
||||
Assert.assertEquals(10, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
|
||||
setupLargeChunks(maxChunkSize, 10 * maxChunkSize + 1, maxValue);
|
||||
Assert.assertEquals(11, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
|
||||
setupLargeChunks(1, 0xFFFF, maxValue);
|
||||
Assert.assertEquals(0xFFFF, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
|
||||
setupLargeChunks(maxChunkSize / 2, 10 * (maxChunkSize / 2) + 1, maxValue);
|
||||
Assert.assertEquals(11, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChunkTooBig() throws Exception
|
||||
{
|
||||
for(int maxValue : MAX_VALUES) {
|
||||
final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue);
|
||||
try {
|
||||
setupLargeChunks(maxChunkSize + 1, 10 * (maxChunkSize + 1), maxValue);
|
||||
Assert.fail();
|
||||
} catch(IllegalArgumentException e) {
|
||||
Assert.assertTrue("chunk too big for maxValue " + maxValue, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testmaxIntsInBuffer() throws Exception
|
||||
{
|
||||
Assert.assertEquals(CompressedPools.BUFFER_SIZE, CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(1));
|
||||
Assert.assertEquals(CompressedPools.BUFFER_SIZE / 2, CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(2));
|
||||
Assert.assertEquals(CompressedPools.BUFFER_SIZE / 4, CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(4));
|
||||
|
||||
Assert.assertEquals(CompressedPools.BUFFER_SIZE, 0x10000); // nearest power of 2 is 2^14
|
||||
Assert.assertEquals(1 << 14, CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSanityWithSerde() throws Exception
|
||||
{
|
||||
setupSimpleWithSerde(4);
|
||||
|
||||
Assert.assertEquals(4, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
|
||||
setupSimpleWithSerde(2);
|
||||
|
||||
Assert.assertEquals(8, supplier.getBaseBuffers().size());
|
||||
assertIndexMatchesVals();
|
||||
}
|
||||
|
||||
|
||||
// This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it,
|
||||
// which sucks but I can't think of a way to deterministically cause it...
|
||||
@Test
|
||||
public void testConcurrentThreadReads() throws Exception
|
||||
{
|
||||
setupSimple(4);
|
||||
|
||||
final AtomicReference<String> reason = new AtomicReference<>("none");
|
||||
|
||||
final int numRuns = 1000;
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
final CountDownLatch stopLatch = new CountDownLatch(2);
|
||||
final AtomicBoolean failureHappened = new AtomicBoolean(false);
|
||||
new Thread(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
startLatch.await();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
failureHappened.set(true);
|
||||
reason.set("interrupt.");
|
||||
stopLatch.countDown();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (int i = 0; i < numRuns; ++i) {
|
||||
for (int j = 0; j < indexed.size(); ++j) {
|
||||
final long val = vals[j];
|
||||
final long indexedVal = indexed.get(j);
|
||||
if (Longs.compare(val, indexedVal) != 0) {
|
||||
failureHappened.set(true);
|
||||
reason.set(String.format("Thread1[%d]: %d != %d", j, val, indexedVal));
|
||||
stopLatch.countDown();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
failureHappened.set(true);
|
||||
reason.set(e.getMessage());
|
||||
}
|
||||
|
||||
stopLatch.countDown();
|
||||
}
|
||||
}).start();
|
||||
|
||||
final IndexedInts indexed2 = supplier.get();
|
||||
try {
|
||||
new Thread(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
startLatch.await();
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
stopLatch.countDown();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (int i = 0; i < numRuns; ++i) {
|
||||
for (int j = indexed2.size() - 1; j >= 0; --j) {
|
||||
final long val = vals[j];
|
||||
final long indexedVal = indexed2.get(j);
|
||||
if (Longs.compare(val, indexedVal) != 0) {
|
||||
failureHappened.set(true);
|
||||
reason.set(String.format("Thread2[%d]: %d != %d", j, val, indexedVal));
|
||||
stopLatch.countDown();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
reason.set(e.getMessage());
|
||||
failureHappened.set(true);
|
||||
}
|
||||
|
||||
stopLatch.countDown();
|
||||
}
|
||||
}).start();
|
||||
|
||||
startLatch.countDown();
|
||||
|
||||
stopLatch.await();
|
||||
}
|
||||
finally {
|
||||
CloseQuietly.close(indexed2);
|
||||
}
|
||||
|
||||
if (failureHappened.get()) {
|
||||
Assert.fail("Failure happened. Reason: " + reason.get());
|
||||
}
|
||||
}
|
||||
|
||||
private void assertIndexMatchesVals()
|
||||
{
|
||||
Assert.assertEquals(vals.length, indexed.size());
|
||||
|
||||
// sequential access
|
||||
int[] indices = new int[vals.length];
|
||||
for (int i = 0; i < indexed.size(); ++i) {
|
||||
final int expected = vals[i];
|
||||
final int actual = indexed.get(i);
|
||||
Assert.assertEquals(expected, actual);
|
||||
indices[i] = i;
|
||||
}
|
||||
|
||||
Collections.shuffle(Arrays.asList(indices));
|
||||
// random access
|
||||
for (int i = 0; i < indexed.size(); ++i) {
|
||||
int k = indices[i];
|
||||
Assert.assertEquals(vals[k], indexed.get(k));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue