diff --git a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java index 9144f1a3b2b..17703f10104 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedObjectStrategy.java @@ -19,6 +19,7 @@ package io.druid.segment.data; +import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.metamx.common.logger.Logger; import com.ning.compress.BufferRecycler; @@ -215,9 +216,8 @@ public class CompressedObjectStrategy implements ObjectStrateg } catch (IOException e) { log.error(e, "Error compressing data"); + throw Throwables.propagate(e); } - // IOException should be on ResourceHolder.close(), not encodeChunk, so this *should* never happen - return null; } } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java index c1a55700b05..5e5a00985e3 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java @@ -21,6 +21,7 @@ package io.druid.segment.data; import com.google.common.io.Closeables; import com.google.common.primitives.Floats; +import io.druid.segment.CompressedPools; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,6 +35,9 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.FloatBuffer; import java.nio.channels.Channels; +import java.util.Arrays; +import java.util.Collections; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -87,6 +91,11 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest 0.0f, 0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f, 0.8f, 0.9f, 0.10f, 0.11f, 0.12f, 0.13f, 0.14f, 0.15f, 0.16f }; + makeWithSerde(chunkSize); + } + + private void makeWithSerde(int chunkSize) throws IOException + { ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer( FloatBuffer.wrap(vals), chunkSize, ByteOrder.nativeOrder(), compressionStrategy @@ -100,28 +109,53 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest indexed = supplier.get(); } + private void setupLargeChunks(final int chunkSize, final int totalSize) throws IOException + { + vals = new float[totalSize]; + Random rand = new Random(0); + for(int i = 0; i < vals.length; ++i) { + vals[i] = (float)rand.nextGaussian(); + } + + makeWithSerde(chunkSize); + } + @Test public void testSanity() throws Exception { setupSimple(5); - Assert.assertEquals(4, supplier.getBaseFloatBuffers().size()); - - Assert.assertEquals(vals.length, indexed.size()); - for (int i = 0; i < indexed.size(); ++i) { - Assert.assertEquals(vals[i], indexed.get(i), 0.0); - } + assertIndexMatchesVals(); // test powers of 2 setupSimple(2); - Assert.assertEquals(9, supplier.getBaseFloatBuffers().size()); + assertIndexMatchesVals(); + } - Assert.assertEquals(vals.length, indexed.size()); - for (int i = 0; i < indexed.size(); ++i) { - Assert.assertEquals(vals[i], indexed.get(i), 0.0); - } + @Test + public void testLargeChunks() throws Exception + { + final int maxChunkSize = CompressedPools.BUFFER_SIZE / Floats.BYTES; + setupLargeChunks(maxChunkSize, 10 * maxChunkSize); + Assert.assertEquals(10, supplier.getBaseFloatBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(maxChunkSize, 10 * maxChunkSize + 1); + Assert.assertEquals(11, supplier.getBaseFloatBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(maxChunkSize - 1, 10 * (maxChunkSize - 1) + 1); + Assert.assertEquals(11, supplier.getBaseFloatBuffers().size()); + assertIndexMatchesVals(); + } + + @Test(expected = IllegalArgumentException.class) + public void testChunkTooBig() throws Exception + { + final int maxChunkSize = CompressedPools.BUFFER_SIZE / Floats.BYTES; + setupLargeChunks(maxChunkSize + 1, 10 * (maxChunkSize + 1)); } @Test @@ -149,20 +183,14 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest Assert.assertEquals(4, supplier.getBaseFloatBuffers().size()); - Assert.assertEquals(vals.length, indexed.size()); - for (int i = 0; i < indexed.size(); ++i) { - Assert.assertEquals(vals[i], indexed.get(i), 0.0); - } + assertIndexMatchesVals(); // test powers of 2 setupSimpleWithSerde(2); Assert.assertEquals(9, supplier.getBaseFloatBuffers().size()); - Assert.assertEquals(vals.length, indexed.size()); - for (int i = 0; i < indexed.size(); ++i) { - Assert.assertEquals(vals[i], indexed.get(i), 0.0); - } + assertIndexMatchesVals(); } @Test @@ -296,4 +324,23 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest Assert.assertEquals(vals[i + startIndex], filled[i], 0.0); } } + + private void assertIndexMatchesVals() + { + Assert.assertEquals(vals.length, indexed.size()); + + // sequential access + int[] indices = new int[vals.length]; + for (int i = 0; i < indexed.size(); ++i) { + Assert.assertEquals(vals[i], indexed.get(i), 0.0); + indices[i] = i; + } + + Collections.shuffle(Arrays.asList(indices)); + // random access + for (int i = 0; i < indexed.size(); ++i) { + int k = indices[i]; + Assert.assertEquals(vals[k], indexed.get(k), 0.0); + } + } } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java index b251134ddc7..d8bedf1d988 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsIndexedSupplierTest.java @@ -21,6 +21,7 @@ package io.druid.segment.data; import com.google.common.primitives.Longs; import com.metamx.common.guava.CloseQuietly; +import io.druid.segment.CompressedPools; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,6 +35,9 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.LongBuffer; import java.nio.channels.Channels; +import java.util.Arrays; +import java.util.Collections; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -65,13 +69,13 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest CloseQuietly.close(indexed); } - private void setupSimple() + private void setupSimple(final int chunkSize) { vals = new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; supplier = CompressedLongsIndexedSupplier.fromLongBuffer( LongBuffer.wrap(vals), - 5, + chunkSize, ByteOrder.nativeOrder(), compressionStrategy ); @@ -79,13 +83,18 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest indexed = supplier.get(); } - private void setupSimpleWithSerde() throws IOException + private void setupSimpleWithSerde(final int chunkSize) throws IOException { vals = new long[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; + makeWithSerde(chunkSize); + } + + private void makeWithSerde(final int chunkSize) throws IOException + { ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedLongsIndexedSupplier theSupplier = CompressedLongsIndexedSupplier.fromLongBuffer( - LongBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy + LongBuffer.wrap(vals), chunkSize, ByteOrder.nativeOrder(), compressionStrategy ); theSupplier.writeToChannel(Channels.newChannel(baos)); @@ -96,23 +105,64 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest indexed = supplier.get(); } + private void setupLargeChunks(final int chunkSize, final int totalSize) throws IOException + { + vals = new long[totalSize]; + Random rand = new Random(0); + for(int i = 0; i < vals.length; ++i) { + vals[i] = rand.nextLong(); + } + + makeWithSerde(chunkSize); + } + @Test public void testSanity() throws Exception { - setupSimple(); + setupSimple(5); Assert.assertEquals(4, supplier.getBaseLongBuffers().size()); + assertIndexMatchesVals(); - Assert.assertEquals(vals.length, indexed.size()); - for (int i = 0; i < indexed.size(); ++i) { - Assert.assertEquals(vals[i], indexed.get(i)); - } + // test powers of 2 + setupSimple(4); + Assert.assertEquals(4, supplier.getBaseLongBuffers().size()); + assertIndexMatchesVals(); + + setupSimple(32); + Assert.assertEquals(1, supplier.getBaseLongBuffers().size()); + assertIndexMatchesVals(); + } + + @Test + public void testLargeChunks() throws Exception + { + final int maxChunkSize = CompressedPools.BUFFER_SIZE / Longs.BYTES; + + setupLargeChunks(maxChunkSize, 10 * maxChunkSize); + Assert.assertEquals(10, supplier.getBaseLongBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(maxChunkSize, 10 * maxChunkSize + 1); + Assert.assertEquals(11, supplier.getBaseLongBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(maxChunkSize - 1, 10 * (maxChunkSize - 1) + 1); + Assert.assertEquals(11, supplier.getBaseLongBuffers().size()); + assertIndexMatchesVals(); + } + + @Test(expected = IllegalArgumentException.class) + public void testChunkTooBig() throws Exception + { + final int maxChunkSize = CompressedPools.BUFFER_SIZE / Longs.BYTES; + setupLargeChunks(maxChunkSize + 1, 10 * (maxChunkSize + 1)); } @Test public void testBulkFill() throws Exception { - setupSimple(); + setupSimple(5); tryFill(0, 15); tryFill(3, 6); @@ -123,27 +173,23 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest @Test(expected = IndexOutOfBoundsException.class) public void testBulkFillTooMuch() throws Exception { - setupSimple(); + setupSimple(5); tryFill(7, 10); } @Test public void testSanityWithSerde() throws Exception { - setupSimpleWithSerde(); + setupSimpleWithSerde(5); Assert.assertEquals(4, supplier.getBaseLongBuffers().size()); - - Assert.assertEquals(vals.length, indexed.size()); - for (int i = 0; i < indexed.size(); ++i) { - Assert.assertEquals(vals[i], indexed.get(i)); - } + assertIndexMatchesVals(); } @Test public void testBulkFillWithSerde() throws Exception { - setupSimpleWithSerde(); + setupSimpleWithSerde(5); tryFill(0, 15); tryFill(3, 6); @@ -154,7 +200,7 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest @Test(expected = IndexOutOfBoundsException.class) public void testBulkFillTooMuchWithSerde() throws Exception { - setupSimpleWithSerde(); + setupSimpleWithSerde(5); tryFill(7, 10); } @@ -163,7 +209,7 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest @Test public void testConcurrentThreadReads() throws Exception { - setupSimple(); + setupSimple(5); final AtomicReference reason = new AtomicReference("none"); @@ -271,4 +317,23 @@ public class CompressedLongsIndexedSupplierTest extends CompressionStrategyTest Assert.assertEquals(vals[i + startIndex], filled[i]); } } + + private void assertIndexMatchesVals() + { + Assert.assertEquals(vals.length, indexed.size()); + + // sequential access + int[] indices = new int[vals.length]; + for (int i = 0; i < indexed.size(); ++i) { + Assert.assertEquals(vals[i], indexed.get(i), 0.0); + indices[i] = i; + } + + Collections.shuffle(Arrays.asList(indices)); + // random access + for (int i = 0; i < indexed.size(); ++i) { + int k = indices[i]; + Assert.assertEquals(vals[k], indexed.get(k), 0.0); + } + } }