mirror of https://github.com/apache/druid.git
Merge pull request #1006 from metamx/fix-lzf-exceeding-chunksize
Tests for larger compression chunks + throw errors on decompression
This commit is contained in:
commit
2aa9e1bf3f
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package io.druid.segment.data;
|
package io.druid.segment.data;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import com.ning.compress.BufferRecycler;
|
import com.ning.compress.BufferRecycler;
|
||||||
|
@ -215,9 +216,8 @@ public class CompressedObjectStrategy<T extends Buffer> implements ObjectStrateg
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
log.error(e, "Error compressing data");
|
log.error(e, "Error compressing data");
|
||||||
|
throw Throwables.propagate(e);
|
||||||
}
|
}
|
||||||
// IOException should be on ResourceHolder.close(), not encodeChunk, so this *should* never happen
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.segment.data;
|
||||||
|
|
||||||
import com.google.common.io.Closeables;
|
import com.google.common.io.Closeables;
|
||||||
import com.google.common.primitives.Floats;
|
import com.google.common.primitives.Floats;
|
||||||
|
import io.druid.segment.CompressedPools;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -34,6 +35,9 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.nio.FloatBuffer;
|
import java.nio.FloatBuffer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -87,6 +91,11 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
0.0f, 0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f, 0.8f, 0.9f, 0.10f, 0.11f, 0.12f, 0.13f, 0.14f, 0.15f, 0.16f
|
0.0f, 0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f, 0.8f, 0.9f, 0.10f, 0.11f, 0.12f, 0.13f, 0.14f, 0.15f, 0.16f
|
||||||
};
|
};
|
||||||
|
|
||||||
|
makeWithSerde(chunkSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void makeWithSerde(int chunkSize) throws IOException
|
||||||
|
{
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer(
|
final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer(
|
||||||
FloatBuffer.wrap(vals), chunkSize, ByteOrder.nativeOrder(), compressionStrategy
|
FloatBuffer.wrap(vals), chunkSize, ByteOrder.nativeOrder(), compressionStrategy
|
||||||
|
@ -100,28 +109,53 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
indexed = supplier.get();
|
indexed = supplier.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setupLargeChunks(final int chunkSize, final int totalSize) throws IOException
|
||||||
|
{
|
||||||
|
vals = new float[totalSize];
|
||||||
|
Random rand = new Random(0);
|
||||||
|
for(int i = 0; i < vals.length; ++i) {
|
||||||
|
vals[i] = (float)rand.nextGaussian();
|
||||||
|
}
|
||||||
|
|
||||||
|
makeWithSerde(chunkSize);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSanity() throws Exception
|
public void testSanity() throws Exception
|
||||||
{
|
{
|
||||||
setupSimple(5);
|
setupSimple(5);
|
||||||
|
|
||||||
Assert.assertEquals(4, supplier.getBaseFloatBuffers().size());
|
Assert.assertEquals(4, supplier.getBaseFloatBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
Assert.assertEquals(vals.length, indexed.size());
|
|
||||||
for (int i = 0; i < indexed.size(); ++i) {
|
|
||||||
Assert.assertEquals(vals[i], indexed.get(i), 0.0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// test powers of 2
|
// test powers of 2
|
||||||
setupSimple(2);
|
setupSimple(2);
|
||||||
|
|
||||||
Assert.assertEquals(9, supplier.getBaseFloatBuffers().size());
|
Assert.assertEquals(9, supplier.getBaseFloatBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
}
|
||||||
|
|
||||||
Assert.assertEquals(vals.length, indexed.size());
|
@Test
|
||||||
for (int i = 0; i < indexed.size(); ++i) {
|
public void testLargeChunks() throws Exception
|
||||||
Assert.assertEquals(vals[i], indexed.get(i), 0.0);
|
{
|
||||||
}
|
final int maxChunkSize = CompressedPools.BUFFER_SIZE / Floats.BYTES;
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize, 10 * maxChunkSize);
|
||||||
|
Assert.assertEquals(10, supplier.getBaseFloatBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize, 10 * maxChunkSize + 1);
|
||||||
|
Assert.assertEquals(11, supplier.getBaseFloatBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize - 1, 10 * (maxChunkSize - 1) + 1);
|
||||||
|
Assert.assertEquals(11, supplier.getBaseFloatBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testChunkTooBig() throws Exception
|
||||||
|
{
|
||||||
|
final int maxChunkSize = CompressedPools.BUFFER_SIZE / Floats.BYTES;
|
||||||
|
setupLargeChunks(maxChunkSize + 1, 10 * (maxChunkSize + 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -149,20 +183,14 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
|
|
||||||
Assert.assertEquals(4, supplier.getBaseFloatBuffers().size());
|
Assert.assertEquals(4, supplier.getBaseFloatBuffers().size());
|
||||||
|
|
||||||
Assert.assertEquals(vals.length, indexed.size());
|
assertIndexMatchesVals();
|
||||||
for (int i = 0; i < indexed.size(); ++i) {
|
|
||||||
Assert.assertEquals(vals[i], indexed.get(i), 0.0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// test powers of 2
|
// test powers of 2
|
||||||
setupSimpleWithSerde(2);
|
setupSimpleWithSerde(2);
|
||||||
|
|
||||||
Assert.assertEquals(9, supplier.getBaseFloatBuffers().size());
|
Assert.assertEquals(9, supplier.getBaseFloatBuffers().size());
|
||||||
|
|
||||||
Assert.assertEquals(vals.length, indexed.size());
|
assertIndexMatchesVals();
|
||||||
for (int i = 0; i < indexed.size(); ++i) {
|
|
||||||
Assert.assertEquals(vals[i], indexed.get(i), 0.0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -296,4 +324,23 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
Assert.assertEquals(vals[i + startIndex], filled[i], 0.0);
|
Assert.assertEquals(vals[i + startIndex], filled[i], 0.0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertIndexMatchesVals()
|
||||||
|
{
|
||||||
|
Assert.assertEquals(vals.length, indexed.size());
|
||||||
|
|
||||||
|
// sequential access
|
||||||
|
int[] indices = new int[vals.length];
|
||||||
|
for (int i = 0; i < indexed.size(); ++i) {
|
||||||
|
Assert.assertEquals(vals[i], indexed.get(i), 0.0);
|
||||||
|
indices[i] = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
Collections.shuffle(Arrays.asList(indices));
|
||||||
|
// random access
|
||||||
|
for (int i = 0; i < indexed.size(); ++i) {
|
||||||
|
int k = indices[i];
|
||||||
|
Assert.assertEquals(vals[k], indexed.get(k), 0.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.segment.data;
|
||||||
|
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.metamx.common.guava.CloseQuietly;
|
import com.metamx.common.guava.CloseQuietly;
|
||||||
|
import io.druid.segment.CompressedPools;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -34,6 +35,9 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.nio.LongBuffer;
|
import java.nio.LongBuffer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -65,13 +69,13 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
CloseQuietly.close(indexed);
|
CloseQuietly.close(indexed);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupSimple()
|
private void setupSimple(final int chunkSize)
|
||||||
{
|
{
|
||||||
vals = new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
|
vals = new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
|
||||||
|
|
||||||
supplier = CompressedLongsIndexedSupplier.fromLongBuffer(
|
supplier = CompressedLongsIndexedSupplier.fromLongBuffer(
|
||||||
LongBuffer.wrap(vals),
|
LongBuffer.wrap(vals),
|
||||||
5,
|
chunkSize,
|
||||||
ByteOrder.nativeOrder(),
|
ByteOrder.nativeOrder(),
|
||||||
compressionStrategy
|
compressionStrategy
|
||||||
);
|
);
|
||||||
|
@ -79,13 +83,18 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
indexed = supplier.get();
|
indexed = supplier.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupSimpleWithSerde() throws IOException
|
private void setupSimpleWithSerde(final int chunkSize) throws IOException
|
||||||
{
|
{
|
||||||
vals = new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
|
vals = new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16};
|
||||||
|
|
||||||
|
makeWithSerde(chunkSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void makeWithSerde(final int chunkSize) throws IOException
|
||||||
|
{
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer(
|
final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer(
|
||||||
LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy
|
LongBuffer.wrap(vals), chunkSize, ByteOrder.nativeOrder(), compressionStrategy
|
||||||
);
|
);
|
||||||
theSupplier.writeToChannel(Channels.newChannel(baos));
|
theSupplier.writeToChannel(Channels.newChannel(baos));
|
||||||
|
|
||||||
|
@ -96,23 +105,64 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
indexed = supplier.get();
|
indexed = supplier.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setupLargeChunks(final int chunkSize, final int totalSize) throws IOException
|
||||||
|
{
|
||||||
|
vals = new long[totalSize];
|
||||||
|
Random rand = new Random(0);
|
||||||
|
for(int i = 0; i < vals.length; ++i) {
|
||||||
|
vals[i] = rand.nextLong();
|
||||||
|
}
|
||||||
|
|
||||||
|
makeWithSerde(chunkSize);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSanity() throws Exception
|
public void testSanity() throws Exception
|
||||||
{
|
{
|
||||||
setupSimple();
|
setupSimple(5);
|
||||||
|
|
||||||
Assert.assertEquals(4, supplier.getBaseLongBuffers().size());
|
Assert.assertEquals(4, supplier.getBaseLongBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
Assert.assertEquals(vals.length, indexed.size());
|
// test powers of 2
|
||||||
for (int i = 0; i < indexed.size(); ++i) {
|
setupSimple(4);
|
||||||
Assert.assertEquals(vals[i], indexed.get(i));
|
Assert.assertEquals(4, supplier.getBaseLongBuffers().size());
|
||||||
}
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
setupSimple(32);
|
||||||
|
Assert.assertEquals(1, supplier.getBaseLongBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLargeChunks() throws Exception
|
||||||
|
{
|
||||||
|
final int maxChunkSize = CompressedPools.BUFFER_SIZE / Longs.BYTES;
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize, 10 * maxChunkSize);
|
||||||
|
Assert.assertEquals(10, supplier.getBaseLongBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize, 10 * maxChunkSize + 1);
|
||||||
|
Assert.assertEquals(11, supplier.getBaseLongBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
|
||||||
|
setupLargeChunks(maxChunkSize - 1, 10 * (maxChunkSize - 1) + 1);
|
||||||
|
Assert.assertEquals(11, supplier.getBaseLongBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testChunkTooBig() throws Exception
|
||||||
|
{
|
||||||
|
final int maxChunkSize = CompressedPools.BUFFER_SIZE / Longs.BYTES;
|
||||||
|
setupLargeChunks(maxChunkSize + 1, 10 * (maxChunkSize + 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBulkFill() throws Exception
|
public void testBulkFill() throws Exception
|
||||||
{
|
{
|
||||||
setupSimple();
|
setupSimple(5);
|
||||||
|
|
||||||
tryFill(0, 15);
|
tryFill(0, 15);
|
||||||
tryFill(3, 6);
|
tryFill(3, 6);
|
||||||
|
@ -123,27 +173,23 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
@Test(expected = IndexOutOfBoundsException.class)
|
@Test(expected = IndexOutOfBoundsException.class)
|
||||||
public void testBulkFillTooMuch() throws Exception
|
public void testBulkFillTooMuch() throws Exception
|
||||||
{
|
{
|
||||||
setupSimple();
|
setupSimple(5);
|
||||||
tryFill(7, 10);
|
tryFill(7, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSanityWithSerde() throws Exception
|
public void testSanityWithSerde() throws Exception
|
||||||
{
|
{
|
||||||
setupSimpleWithSerde();
|
setupSimpleWithSerde(5);
|
||||||
|
|
||||||
Assert.assertEquals(4, supplier.getBaseLongBuffers().size());
|
Assert.assertEquals(4, supplier.getBaseLongBuffers().size());
|
||||||
|
assertIndexMatchesVals();
|
||||||
Assert.assertEquals(vals.length, indexed.size());
|
|
||||||
for (int i = 0; i < indexed.size(); ++i) {
|
|
||||||
Assert.assertEquals(vals[i], indexed.get(i));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBulkFillWithSerde() throws Exception
|
public void testBulkFillWithSerde() throws Exception
|
||||||
{
|
{
|
||||||
setupSimpleWithSerde();
|
setupSimpleWithSerde(5);
|
||||||
|
|
||||||
tryFill(0, 15);
|
tryFill(0, 15);
|
||||||
tryFill(3, 6);
|
tryFill(3, 6);
|
||||||
|
@ -154,7 +200,7 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
@Test(expected = IndexOutOfBoundsException.class)
|
@Test(expected = IndexOutOfBoundsException.class)
|
||||||
public void testBulkFillTooMuchWithSerde() throws Exception
|
public void testBulkFillTooMuchWithSerde() throws Exception
|
||||||
{
|
{
|
||||||
setupSimpleWithSerde();
|
setupSimpleWithSerde(5);
|
||||||
tryFill(7, 10);
|
tryFill(7, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,7 +209,7 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
@Test
|
@Test
|
||||||
public void testConcurrentThreadReads() throws Exception
|
public void testConcurrentThreadReads() throws Exception
|
||||||
{
|
{
|
||||||
setupSimple();
|
setupSimple(5);
|
||||||
|
|
||||||
final AtomicReference<String> reason = new AtomicReference<String>("none");
|
final AtomicReference<String> reason = new AtomicReference<String>("none");
|
||||||
|
|
||||||
|
@ -271,4 +317,23 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest
|
||||||
Assert.assertEquals(vals[i + startIndex], filled[i]);
|
Assert.assertEquals(vals[i + startIndex], filled[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertIndexMatchesVals()
|
||||||
|
{
|
||||||
|
Assert.assertEquals(vals.length, indexed.size());
|
||||||
|
|
||||||
|
// sequential access
|
||||||
|
int[] indices = new int[vals.length];
|
||||||
|
for (int i = 0; i < indexed.size(); ++i) {
|
||||||
|
Assert.assertEquals(vals[i], indexed.get(i), 0.0);
|
||||||
|
indices[i] = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
Collections.shuffle(Arrays.asList(indices));
|
||||||
|
// random access
|
||||||
|
for (int i = 0; i < indexed.size(); ++i) {
|
||||||
|
int k = indices[i];
|
||||||
|
Assert.assertEquals(vals[k], indexed.get(k), 0.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue