mirror of https://github.com/apache/druid.git
add compressed ints column type
This commit is contained in:
parent
5c23679238
commit
ce928d9636
|
@ -0,0 +1,72 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment.data;
|
||||||
|
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.IntBuffer;
|
||||||
|
|
||||||
|
public class CompressedIntBufferObjectStrategy extends FixedSizeCompressedObjectStrategy<IntBuffer>
|
||||||
|
{
|
||||||
|
public static final Ordering<Comparable> ORDERING = Ordering.natural().nullsFirst();
|
||||||
|
|
||||||
|
public static CompressedIntBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
|
||||||
|
{
|
||||||
|
return new CompressedIntBufferObjectStrategy(order, compression, sizePer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompressedIntBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer)
|
||||||
|
{
|
||||||
|
super(
|
||||||
|
order,
|
||||||
|
new BufferConverter<IntBuffer>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public IntBuffer convert(ByteBuffer buf)
|
||||||
|
{
|
||||||
|
return buf.asIntBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(IntBuffer lhs, IntBuffer rhs)
|
||||||
|
{
|
||||||
|
return ORDERING.compare(lhs, rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int sizeOf(int count)
|
||||||
|
{
|
||||||
|
return count * Ints.BYTES;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IntBuffer combine(ByteBuffer into, IntBuffer from)
|
||||||
|
{
|
||||||
|
return into.asIntBuffer().put(from);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
compression,
|
||||||
|
sizePer
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,357 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment.data;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.io.Closeables;
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
import com.metamx.common.IAE;
|
||||||
|
import com.metamx.common.guava.CloseQuietly;
|
||||||
|
import io.druid.collections.ResourceHolder;
|
||||||
|
import io.druid.collections.StupidResourceHolder;
|
||||||
|
import io.druid.segment.CompressedPools;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.IntBuffer;
|
||||||
|
import java.nio.channels.WritableByteChannel;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class CompressedIntsIndexedSupplier implements WritableSupplier<IndexedInts>
|
||||||
|
{
|
||||||
|
public static final byte version = 0x2;
|
||||||
|
public static final int MAX_INTS_IN_BUFFER = CompressedPools.BUFFER_SIZE / Ints.BYTES;
|
||||||
|
|
||||||
|
|
||||||
|
private final int totalSize;
|
||||||
|
private final int sizePer;
|
||||||
|
private final GenericIndexed<ResourceHolder<IntBuffer>> baseIntBuffers;
|
||||||
|
private final CompressedObjectStrategy.CompressionStrategy compression;
|
||||||
|
|
||||||
|
CompressedIntsIndexedSupplier(
|
||||||
|
int totalSize,
|
||||||
|
int sizePer,
|
||||||
|
GenericIndexed<ResourceHolder<IntBuffer>> baseIntBuffers,
|
||||||
|
CompressedObjectStrategy.CompressionStrategy compression
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.totalSize = totalSize;
|
||||||
|
this.sizePer = sizePer;
|
||||||
|
this.baseIntBuffers = baseIntBuffers;
|
||||||
|
this.compression = compression;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int size()
|
||||||
|
{
|
||||||
|
return totalSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexedInts get()
|
||||||
|
{
|
||||||
|
final int div = Integer.numberOfTrailingZeros(sizePer);
|
||||||
|
final int rem = sizePer - 1;
|
||||||
|
final boolean powerOf2 = sizePer == (1 << div);
|
||||||
|
if(powerOf2) {
|
||||||
|
return new CompressedIndexedInts() {
|
||||||
|
@Override
|
||||||
|
public int get(int index)
|
||||||
|
{
|
||||||
|
// optimize division and remainder for powers of 2
|
||||||
|
final int bufferNum = index >> div;
|
||||||
|
|
||||||
|
if (bufferNum != currIndex) {
|
||||||
|
loadBuffer(bufferNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
final int bufferIndex = index & rem;
|
||||||
|
return buffer.get(buffer.position() + bufferIndex);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
return new CompressedIndexedInts();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSerializedSize()
|
||||||
|
{
|
||||||
|
return 1 + // version
|
||||||
|
4 + // totalSize
|
||||||
|
4 + // sizePer
|
||||||
|
1 + // compressionId
|
||||||
|
baseIntBuffers.getSerializedSize(); // data
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeToChannel(WritableByteChannel channel) throws IOException
|
||||||
|
{
|
||||||
|
channel.write(ByteBuffer.wrap(new byte[]{version}));
|
||||||
|
channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize)));
|
||||||
|
channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer)));
|
||||||
|
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
|
||||||
|
baseIntBuffers.writeToChannel(channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompressedIntsIndexedSupplier convertByteOrder(ByteOrder order)
|
||||||
|
{
|
||||||
|
return new CompressedIntsIndexedSupplier(
|
||||||
|
totalSize,
|
||||||
|
sizePer,
|
||||||
|
GenericIndexed.fromIterable(baseIntBuffers, CompressedIntBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
|
||||||
|
compression
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For testing. Do not use unless you like things breaking
|
||||||
|
*/
|
||||||
|
GenericIndexed<ResourceHolder<IntBuffer>> getBaseIntBuffers()
|
||||||
|
{
|
||||||
|
return baseIntBuffers;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompressedIntsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
|
||||||
|
{
|
||||||
|
byte versionFromBuffer = buffer.get();
|
||||||
|
|
||||||
|
if (versionFromBuffer == version) {
|
||||||
|
final int totalSize = buffer.getInt();
|
||||||
|
final int sizePer = buffer.getInt();
|
||||||
|
final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get());
|
||||||
|
return new CompressedIntsIndexedSupplier(
|
||||||
|
totalSize,
|
||||||
|
sizePer,
|
||||||
|
GenericIndexed.read(buffer, CompressedIntBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)),
|
||||||
|
compression
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new IAE("Unknown version[%s]", versionFromBuffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompressedIntsIndexedSupplier fromIntBuffer(IntBuffer buffer, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression)
|
||||||
|
{
|
||||||
|
return fromIntBuffer(buffer, MAX_INTS_IN_BUFFER, byteOrder, compression);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompressedIntsIndexedSupplier fromIntBuffer(
|
||||||
|
final IntBuffer buffer, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression
|
||||||
|
)
|
||||||
|
{
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
chunkFactor <= MAX_INTS_IN_BUFFER, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor
|
||||||
|
);
|
||||||
|
|
||||||
|
return new CompressedIntsIndexedSupplier(
|
||||||
|
buffer.remaining(),
|
||||||
|
chunkFactor,
|
||||||
|
GenericIndexed.fromIterable(
|
||||||
|
new Iterable<ResourceHolder<IntBuffer>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Iterator<ResourceHolder<IntBuffer>> iterator()
|
||||||
|
{
|
||||||
|
return new Iterator<ResourceHolder<IntBuffer>>()
|
||||||
|
{
|
||||||
|
IntBuffer myBuffer = buffer.asReadOnlyBuffer();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext()
|
||||||
|
{
|
||||||
|
return myBuffer.hasRemaining();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceHolder<IntBuffer> next()
|
||||||
|
{
|
||||||
|
IntBuffer retVal = myBuffer.asReadOnlyBuffer();
|
||||||
|
|
||||||
|
if (chunkFactor < myBuffer.remaining()) {
|
||||||
|
retVal.limit(retVal.position() + chunkFactor);
|
||||||
|
}
|
||||||
|
myBuffer.position(myBuffer.position() + retVal.remaining());
|
||||||
|
|
||||||
|
return StupidResourceHolder.create(retVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor)
|
||||||
|
),
|
||||||
|
compression
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CompressedIntsIndexedSupplier fromList(
|
||||||
|
final List<Integer> list , final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression
|
||||||
|
)
|
||||||
|
{
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
chunkFactor <= MAX_INTS_IN_BUFFER, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor
|
||||||
|
);
|
||||||
|
|
||||||
|
return new CompressedIntsIndexedSupplier(
|
||||||
|
list.size(),
|
||||||
|
chunkFactor,
|
||||||
|
GenericIndexed.fromIterable(
|
||||||
|
new Iterable<ResourceHolder<IntBuffer>>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Iterator<ResourceHolder<IntBuffer>> iterator()
|
||||||
|
{
|
||||||
|
return new Iterator<ResourceHolder<IntBuffer>>()
|
||||||
|
{
|
||||||
|
int position = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext()
|
||||||
|
{
|
||||||
|
return position < list.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceHolder<IntBuffer> next()
|
||||||
|
{
|
||||||
|
IntBuffer retVal = IntBuffer.allocate(chunkFactor);
|
||||||
|
|
||||||
|
if (chunkFactor > list.size() - position) {
|
||||||
|
retVal.limit(list.size() - position);
|
||||||
|
}
|
||||||
|
final List<Integer> ints = list.subList(position, position + retVal.remaining());
|
||||||
|
for(int value : ints) {
|
||||||
|
retVal.put(value);
|
||||||
|
}
|
||||||
|
retVal.rewind();
|
||||||
|
position += retVal.remaining();
|
||||||
|
|
||||||
|
return StupidResourceHolder.create(retVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove()
|
||||||
|
{
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor)
|
||||||
|
),
|
||||||
|
compression
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class CompressedIndexedInts implements IndexedInts
|
||||||
|
{
|
||||||
|
final Indexed<ResourceHolder<IntBuffer>> singleThreadedIntBuffers = baseIntBuffers.singleThreaded();
|
||||||
|
|
||||||
|
int currIndex = -1;
|
||||||
|
ResourceHolder<IntBuffer> holder;
|
||||||
|
IntBuffer buffer;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size()
|
||||||
|
{
|
||||||
|
return totalSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int get(int index)
|
||||||
|
{
|
||||||
|
final int bufferNum = index / sizePer;
|
||||||
|
final int bufferIndex = index % sizePer;
|
||||||
|
|
||||||
|
if (bufferNum != currIndex) {
|
||||||
|
loadBuffer(bufferNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
return buffer.get(buffer.position() + bufferIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<Integer> iterator()
|
||||||
|
{
|
||||||
|
return new IndexedIntsIterator(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void fill(int index, int[] toFill)
|
||||||
|
{
|
||||||
|
if (totalSize - index < toFill.length) {
|
||||||
|
throw new IndexOutOfBoundsException(
|
||||||
|
String.format(
|
||||||
|
"Cannot fill array of size[%,d] at index[%,d]. Max size[%,d]", toFill.length, index, totalSize
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
int bufferNum = index / sizePer;
|
||||||
|
int bufferIndex = index % sizePer;
|
||||||
|
|
||||||
|
int leftToFill = toFill.length;
|
||||||
|
while (leftToFill > 0) {
|
||||||
|
if (bufferNum != currIndex) {
|
||||||
|
loadBuffer(bufferNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer.mark();
|
||||||
|
buffer.position(buffer.position() + bufferIndex);
|
||||||
|
final int numToGet = Math.min(buffer.remaining(), leftToFill);
|
||||||
|
buffer.get(toFill, toFill.length - leftToFill, numToGet);
|
||||||
|
buffer.reset();
|
||||||
|
leftToFill -= numToGet;
|
||||||
|
++bufferNum;
|
||||||
|
bufferIndex = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void loadBuffer(int bufferNum)
|
||||||
|
{
|
||||||
|
CloseQuietly.close(holder);
|
||||||
|
holder = singleThreadedIntBuffers.get(bufferNum);
|
||||||
|
buffer = holder.get();
|
||||||
|
currIndex = bufferNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "CompressedIntsIndexedSupplier_Anonymous{" +
|
||||||
|
"currIndex=" + currIndex +
|
||||||
|
", sizePer=" + sizePer +
|
||||||
|
", numChunks=" + singleThreadedIntBuffers.size() +
|
||||||
|
", totalSize=" + totalSize +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException
|
||||||
|
{
|
||||||
|
Closeables.close(holder, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,341 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.segment.data;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Ints;
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
|
import com.metamx.common.guava.CloseQuietly;
|
||||||
|
import io.druid.segment.CompressedPools;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.IntBuffer;
|
||||||
|
import java.nio.channels.Channels;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
public class CompressedIntsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
|
{
|
||||||
|
public CompressedIntsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy)
|
||||||
|
{
|
||||||
|
super(compressionStrategy);
|
||||||
|
}
|
||||||
|
|
||||||
|
private IndexedInts indexed;
|
||||||
|
private CompressedIntsIndexedSupplier supplier;
|
||||||
|
private int[] vals;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception
|
||||||
|
{
|
||||||
|
CloseQuietly.close(indexed);
|
||||||
|
indexed = null;
|
||||||
|
supplier = null;
|
||||||
|
vals = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception
|
||||||
|
{
|
||||||
|
CloseQuietly.close(indexed);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupSimple(final int chunkSize)
|
||||||
|
{
|
||||||
|
CloseQuietly.close(indexed);
|
||||||
|
|
||||||
|
vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
|
||||||
|
|
||||||
|
supplier = CompressedIntsIndexedSupplier.fromIntBuffer(
|
||||||
|
IntBuffer.wrap(vals),
|
||||||
|
chunkSize,
|
||||||
|
ByteOrder.nativeOrder(),
|
||||||
|
compressionStrategy
|
||||||
|
);
|
||||||
|
|
||||||
|
indexed = supplier.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupSimpleWithSerde(final int chunkSize) throws IOException
|
||||||
|
{
|
||||||
|
vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
|
||||||
|
|
||||||
|
makeWithSerde(chunkSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void makeWithSerde(final int chunkSize) throws IOException
|
||||||
|
{
|
||||||
|
CloseQuietly.close(indexed);
|
||||||
|
|
||||||
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
|
final CompressedIntsIndexedSupplier theSupplier = CompressedIntsIndexedSupplier.fromIntBuffer(
|
||||||
|
IntBuffer.wrap(vals), chunkSize, ByteOrder.nativeOrder(), compressionStrategy
|
||||||
|
);
|
||||||
|
theSupplier.writeToChannel(Channels.newChannel(baos));
|
||||||
|
|
||||||
|
final byte[] bytes = baos.toByteArray();
|
||||||
|
Assert.assertEquals(theSupplier.getSerializedSize(), bytes.length);
|
||||||
|
|
||||||
|
supplier = CompressedIntsIndexedSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), ByteOrder.nativeOrder());
|
||||||
|
indexed = supplier.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupLargeChunks(final int chunkSize, final int totalSize) throws IOException
|
||||||
|
{
|
||||||
|
vals = new int[totalSize];
|
||||||
|
Random rand = new Random(0);
|
||||||
|
for(int i = 0; i < vals.length; ++i) {
|
||||||
|
vals[i] = rand.nextInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
makeWithSerde(chunkSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSanity() throws Exception
|
||||||
|
{
|
||||||
|
setupSimple(5);
|
||||||
|
|
||||||
|
Assert.assertEquals(4, supplier.getBaseIntBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
// test powers of 2
|
||||||
|
setupSimple(4);
|
||||||
|
Assert.assertEquals(4, supplier.getBaseIntBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
setupSimple(32);
|
||||||
|
Assert.assertEquals(1, supplier.getBaseIntBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLargeChunks() throws Exception
|
||||||
|
{
|
||||||
|
final int maxChunkSize = CompressedPools.BUFFER_SIZE / Longs.BYTES;
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize, 10 * maxChunkSize);
|
||||||
|
Assert.assertEquals(10, supplier.getBaseIntBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize, 10 * maxChunkSize + 1);
|
||||||
|
Assert.assertEquals(11, supplier.getBaseIntBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize - 1, 10 * (maxChunkSize - 1) + 1);
|
||||||
|
Assert.assertEquals(11, supplier.getBaseIntBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testChunkTooBig() throws Exception
|
||||||
|
{
|
||||||
|
final int maxChunkSize = CompressedPools.BUFFER_SIZE / Ints.BYTES;
|
||||||
|
setupLargeChunks(maxChunkSize + 1, 10 * (maxChunkSize + 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBulkFill() throws Exception
|
||||||
|
{
|
||||||
|
setupSimple(5);
|
||||||
|
|
||||||
|
tryFill(0, 15);
|
||||||
|
tryFill(3, 6);
|
||||||
|
tryFill(7, 7);
|
||||||
|
tryFill(7, 9);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IndexOutOfBoundsException.class)
|
||||||
|
public void testBulkFillTooMuch() throws Exception
|
||||||
|
{
|
||||||
|
setupSimple(5);
|
||||||
|
tryFill(7, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSanityWithSerde() throws Exception
|
||||||
|
{
|
||||||
|
setupSimpleWithSerde(5);
|
||||||
|
|
||||||
|
Assert.assertEquals(4, supplier.getBaseIntBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBulkFillWithSerde() throws Exception
|
||||||
|
{
|
||||||
|
setupSimpleWithSerde(5);
|
||||||
|
|
||||||
|
tryFill(0, 15);
|
||||||
|
tryFill(3, 6);
|
||||||
|
tryFill(7, 7);
|
||||||
|
tryFill(7, 9);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IndexOutOfBoundsException.class)
|
||||||
|
public void testBulkFillTooMuchWithSerde() throws Exception
|
||||||
|
{
|
||||||
|
setupSimpleWithSerde(5);
|
||||||
|
tryFill(7, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it,
|
||||||
|
// which sucks but I can't think of a way to deterministically cause it...
|
||||||
|
@Test
|
||||||
|
public void testConcurrentThreadReads() throws Exception
|
||||||
|
{
|
||||||
|
setupSimple(5);
|
||||||
|
|
||||||
|
final AtomicReference<String> reason = new AtomicReference<String>("none");
|
||||||
|
|
||||||
|
final int numRuns = 1000;
|
||||||
|
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||||
|
final CountDownLatch stopLatch = new CountDownLatch(2);
|
||||||
|
final AtomicBoolean failureHappened = new AtomicBoolean(false);
|
||||||
|
new Thread(new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
startLatch.await();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
failureHappened.set(true);
|
||||||
|
reason.set("interrupt.");
|
||||||
|
stopLatch.countDown();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < numRuns; ++i) {
|
||||||
|
for (int j = 0; j < indexed.size(); ++j) {
|
||||||
|
final long val = vals[j];
|
||||||
|
final long indexedVal = indexed.get(j);
|
||||||
|
if (Longs.compare(val, indexedVal) != 0) {
|
||||||
|
failureHappened.set(true);
|
||||||
|
reason.set(String.format("Thread1[%d]: %d != %d", j, val, indexedVal));
|
||||||
|
stopLatch.countDown();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
failureHappened.set(true);
|
||||||
|
reason.set(e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
stopLatch.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
final IndexedInts indexed2 = supplier.get();
|
||||||
|
try {
|
||||||
|
new Thread(new Runnable()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
startLatch.await();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
stopLatch.countDown();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < numRuns; ++i) {
|
||||||
|
for (int j = indexed2.size() - 1; j >= 0; --j) {
|
||||||
|
final long val = vals[j];
|
||||||
|
final long indexedVal = indexed2.get(j);
|
||||||
|
if (Longs.compare(val, indexedVal) != 0) {
|
||||||
|
failureHappened.set(true);
|
||||||
|
reason.set(String.format("Thread2[%d]: %d != %d", j, val, indexedVal));
|
||||||
|
stopLatch.countDown();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
reason.set(e.getMessage());
|
||||||
|
failureHappened.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
stopLatch.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
|
||||||
|
startLatch.countDown();
|
||||||
|
|
||||||
|
stopLatch.await();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
CloseQuietly.close(indexed2);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (failureHappened.get()) {
|
||||||
|
Assert.fail("Failure happened. Reason: " + reason.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void tryFill(final int startIndex, final int size)
|
||||||
|
{
|
||||||
|
int[] filled = new int[size];
|
||||||
|
indexed.fill(startIndex, filled);
|
||||||
|
|
||||||
|
for (int i = startIndex; i < filled.length; i++) {
|
||||||
|
Assert.assertEquals(vals[i + startIndex], filled[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertIndexMatchesVals()
|
||||||
|
{
|
||||||
|
Assert.assertEquals(vals.length, indexed.size());
|
||||||
|
|
||||||
|
// sequential access
|
||||||
|
int[] indices = new int[vals.length];
|
||||||
|
for (int i = 0; i < indexed.size(); ++i) {
|
||||||
|
Assert.assertEquals(vals[i], indexed.get(i), 0.0);
|
||||||
|
indices[i] = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
Collections.shuffle(Arrays.asList(indices));
|
||||||
|
// random access
|
||||||
|
for (int i = 0; i < indexed.size(); ++i) {
|
||||||
|
int k = indices[i];
|
||||||
|
Assert.assertEquals(vals[k], indexed.get(k), 0.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue