diff --git a/processing/src/main/java/io/druid/segment/CompressedPools.java b/processing/src/main/java/io/druid/segment/CompressedPools.java index cccf216367e..2c9236396a2 100644 --- a/processing/src/main/java/io/druid/segment/CompressedPools.java +++ b/processing/src/main/java/io/druid/segment/CompressedPools.java @@ -35,6 +35,7 @@ public class CompressedPools { private static final Logger log = new Logger(CompressedPools.class); + public static final int BUFFER_SIZE = 0x10000; private static final StupidPool chunkEncoderPool = new StupidPool( new Supplier() { @@ -44,7 +45,7 @@ public class CompressedPools public ChunkEncoder get() { log.info("Allocating new chunkEncoder[%,d]", counter.incrementAndGet()); - return new ChunkEncoder(0xFFFF); + return new ChunkEncoder(BUFFER_SIZE); } } ); @@ -63,7 +64,7 @@ public class CompressedPools public byte[] get() { log.info("Allocating new outputBytesPool[%,d]", counter.incrementAndGet()); - return new byte[0xFFFF]; + return new byte[BUFFER_SIZE]; } } ); @@ -82,7 +83,7 @@ public class CompressedPools public ByteBuffer get() { log.info("Allocating new bigEndByteBuf[%,d]", counter.incrementAndGet()); - return ByteBuffer.allocateDirect(0xFFFF).order(ByteOrder.BIG_ENDIAN); + return ByteBuffer.allocateDirect(BUFFER_SIZE).order(ByteOrder.BIG_ENDIAN); } } ); @@ -96,7 +97,7 @@ public class CompressedPools public ByteBuffer get() { log.info("Allocating new littleEndByteBuf[%,d]", counter.incrementAndGet()); - return ByteBuffer.allocateDirect(0xFFFF).order(ByteOrder.LITTLE_ENDIAN); + return ByteBuffer.allocateDirect(BUFFER_SIZE).order(ByteOrder.LITTLE_ENDIAN); } } ); diff --git a/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java index 58f57d1191a..f5c1e051f15 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedFloatsIndexedSupplier.java @@ -28,6 +28,7 @@ import com.metamx.common.IAE; import com.metamx.common.guava.CloseQuietly; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; +import io.druid.segment.CompressedPools; import java.io.IOException; import java.nio.ByteBuffer; @@ -42,7 +43,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier { public static final byte LZF_VERSION = 0x1; public static final byte version = 0x2; - public static final int MAX_FLOATS_IN_BUFFER = (0xFFFF >> 2); + public static final int MAX_FLOATS_IN_BUFFER = CompressedPools.BUFFER_SIZE / Floats.BYTES; private final int totalSize; private final int sizePer; @@ -70,87 +71,28 @@ public class CompressedFloatsIndexedSupplier implements Supplier @Override public IndexedFloats get() { - return new IndexedFloats() - { - int currIndex = -1; - ResourceHolder holder; - FloatBuffer buffer; + final int div = Integer.numberOfTrailingZeros(sizePer); + final int rem = sizePer - 1; + final boolean powerOf2 = sizePer == (1 << div); + if(powerOf2) { + return new CompressedIndexedFloats() { + @Override + public float get(int index) + { + // optimize division and remainder for powers of 2 + final int bufferNum = index >> div; - @Override - public int size() - { - return totalSize; - } - - @Override - public float get(int index) - { - int bufferNum = index / sizePer; - int bufferIndex = index % sizePer; - - if (bufferNum != currIndex) { - loadBuffer(bufferNum); - } - - return buffer.get(buffer.position() + bufferIndex); - } - - @Override - public void fill(int index, float[] toFill) - { - if (totalSize - index < toFill.length) { - throw new IndexOutOfBoundsException( - String.format( - "Cannot fill array of size[%,d] at index[%,d]. Max size[%,d]", toFill.length, index, totalSize - ) - ); - } - - int bufferNum = index / sizePer; - int bufferIndex = index % sizePer; - - int leftToFill = toFill.length; - while (leftToFill > 0) { if (bufferNum != currIndex) { loadBuffer(bufferNum); } - buffer.mark(); - buffer.position(buffer.position() + bufferIndex); - final int numToGet = Math.min(buffer.remaining(), leftToFill); - buffer.get(toFill, toFill.length - leftToFill, numToGet); - buffer.reset(); - leftToFill -= numToGet; - ++bufferNum; - bufferIndex = 0; + final int bufferIndex = index & rem; + return buffer.get(buffer.position() + bufferIndex); } - } - - private void loadBuffer(int bufferNum) - { - CloseQuietly.close(holder); - holder = baseFloatBuffers.get(bufferNum); - buffer = holder.get(); - currIndex = bufferNum; - } - - @Override - public String toString() - { - return "CompressedFloatsIndexedSupplier_Anonymous{" + - "currIndex=" + currIndex + - ", sizePer=" + sizePer + - ", numChunks=" + baseFloatBuffers.size() + - ", totalSize=" + totalSize + - '}'; - } - - @Override - public void close() throws IOException - { - Closeables.close(holder, false); - } - }; + }; + } else { + return new CompressedIndexedFloats(); + } } public long getSerializedSize() @@ -185,11 +127,6 @@ public class CompressedFloatsIndexedSupplier implements Supplier return baseFloatBuffers; } - public static int numFloatsInBuffer(int numFloatsInChunk) - { - return MAX_FLOATS_IN_BUFFER - (MAX_FLOATS_IN_BUFFER % numFloatsInChunk); - } - public static CompressedFloatsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) { byte versionFromBuffer = buffer.get(); @@ -245,7 +182,7 @@ public class CompressedFloatsIndexedSupplier implements Supplier ) { Preconditions.checkArgument( - chunkFactor * Floats.BYTES <= 0xffff, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor + chunkFactor <= MAX_FLOATS_IN_BUFFER, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor ); return new CompressedFloatsIndexedSupplier( @@ -294,4 +231,85 @@ public class CompressedFloatsIndexedSupplier implements Supplier ); } + private class CompressedIndexedFloats implements IndexedFloats + { + int currIndex = -1; + ResourceHolder holder; + FloatBuffer buffer; + + @Override + public int size() + { + return totalSize; + } + + @Override + public float get(final int index) + { + // division + remainder is optimized by the compiler so keep those together + final int bufferNum = index / sizePer; + final int bufferIndex = index % sizePer; + + if (bufferNum != currIndex) { + loadBuffer(bufferNum); + } + return buffer.get(buffer.position() + bufferIndex); + } + + @Override + public void fill(int index, float[] toFill) + { + if (totalSize - index < toFill.length) { + throw new IndexOutOfBoundsException( + String.format( + "Cannot fill array of size[%,d] at index[%,d]. Max size[%,d]", toFill.length, index, totalSize + ) + ); + } + + int bufferNum = index / sizePer; + int bufferIndex = index % sizePer; + + int leftToFill = toFill.length; + while (leftToFill > 0) { + if (bufferNum != currIndex) { + loadBuffer(bufferNum); + } + + buffer.mark(); + buffer.position(buffer.position() + bufferIndex); + final int numToGet = Math.min(buffer.remaining(), leftToFill); + buffer.get(toFill, toFill.length - leftToFill, numToGet); + buffer.reset(); + leftToFill -= numToGet; + ++bufferNum; + bufferIndex = 0; + } + } + + protected void loadBuffer(int bufferNum) + { + CloseQuietly.close(holder); + holder = baseFloatBuffers.get(bufferNum); + buffer = holder.get(); + currIndex = bufferNum; + } + + @Override + public String toString() + { + return "CompressedFloatsIndexedSupplier_Anonymous{" + + "currIndex=" + currIndex + + ", sizePer=" + sizePer + + ", numChunks=" + baseFloatBuffers.size() + + ", totalSize=" + totalSize + + '}'; + } + + @Override + public void close() throws IOException + { + Closeables.close(holder, false); + } + } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java index 578712d16f3..dc3e2149584 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedLongsIndexedSupplier.java @@ -28,6 +28,7 @@ import com.metamx.common.IAE; import com.metamx.common.guava.CloseQuietly; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; +import io.druid.segment.CompressedPools; import java.io.IOException; import java.nio.ByteBuffer; @@ -42,6 +43,8 @@ public class CompressedLongsIndexedSupplier implements Supplier { public static final byte LZF_VERSION = 0x1; public static final byte version = 0x2; + public static final int MAX_LONGS_IN_BUFFER = CompressedPools.BUFFER_SIZE / Longs.BYTES; + private final int totalSize; private final int sizePer; @@ -69,99 +72,28 @@ public class CompressedLongsIndexedSupplier implements Supplier @Override public IndexedLongs get() { - return new IndexedLongs() - { - int currIndex = -1; - ResourceHolder holder; - LongBuffer buffer; + final int div = Integer.numberOfTrailingZeros(sizePer); + final int rem = sizePer - 1; + final boolean powerOf2 = sizePer == (1 << div); + if(powerOf2) { + return new CompressedIndexedLongs() { + @Override + public long get(int index) + { + // optimize division and remainder for powers of 2 + final int bufferNum = index >> div; - @Override - public int size() - { - return totalSize; - } - - @Override - public long get(int index) - { - int bufferNum = index / sizePer; - int bufferIndex = index % sizePer; - - if (bufferNum != currIndex) { - loadBuffer(bufferNum); - } - - return buffer.get(buffer.position() + bufferIndex); - } - - @Override - public void fill(int index, long[] toFill) - { - if (totalSize - index < toFill.length) { - throw new IndexOutOfBoundsException( - String.format( - "Cannot fill array of size[%,d] at index[%,d]. Max size[%,d]", toFill.length, index, totalSize - ) - ); - } - - int bufferNum = index / sizePer; - int bufferIndex = index % sizePer; - - int leftToFill = toFill.length; - while (leftToFill > 0) { if (bufferNum != currIndex) { loadBuffer(bufferNum); } - buffer.mark(); - buffer.position(buffer.position() + bufferIndex); - final int numToGet = Math.min(buffer.remaining(), leftToFill); - buffer.get(toFill, toFill.length - leftToFill, numToGet); - buffer.reset(); - leftToFill -= numToGet; - ++bufferNum; - bufferIndex = 0; + final int bufferIndex = index & rem; + return buffer.get(buffer.position() + bufferIndex); } - } - - private void loadBuffer(int bufferNum) - { - CloseQuietly.close(holder); - holder = baseLongBuffers.get(bufferNum); - buffer = holder.get(); - currIndex = bufferNum; - } - - @Override - public int binarySearch(long key) - { - throw new UnsupportedOperationException(); - } - - @Override - public int binarySearch(long key, int from, int to) - { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() - { - return "CompressedLongsIndexedSupplier_Anonymous{" + - "currIndex=" + currIndex + - ", sizePer=" + sizePer + - ", numChunks=" + baseLongBuffers.size() + - ", totalSize=" + totalSize + - '}'; - } - - @Override - public void close() throws IOException - { - Closeables.close(holder, false); - } - }; + }; + } else { + return new CompressedIndexedLongs(); + } } public long getSerializedSize() @@ -227,7 +159,7 @@ public class CompressedLongsIndexedSupplier implements Supplier public static CompressedLongsIndexedSupplier fromLongBuffer(LongBuffer buffer, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression) { - return fromLongBuffer(buffer, 0xFFFF / Longs.BYTES, byteOrder, compression); + return fromLongBuffer(buffer, MAX_LONGS_IN_BUFFER, byteOrder, compression); } public static CompressedLongsIndexedSupplier fromLongBuffer( @@ -235,7 +167,7 @@ public class CompressedLongsIndexedSupplier implements Supplier ) { Preconditions.checkArgument( - chunkFactor * Longs.BYTES <= 0xffff, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor + chunkFactor <= MAX_LONGS_IN_BUFFER, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor ); return new CompressedLongsIndexedSupplier( @@ -284,4 +216,97 @@ public class CompressedLongsIndexedSupplier implements Supplier ); } + private class CompressedIndexedLongs implements IndexedLongs + { + int currIndex = -1; + ResourceHolder holder; + LongBuffer buffer; + + @Override + public int size() + { + return totalSize; + } + + @Override + public long get(int index) + { + final int bufferNum = index / sizePer; + final int bufferIndex = index % sizePer; + + if (bufferNum != currIndex) { + loadBuffer(bufferNum); + } + + return buffer.get(buffer.position() + bufferIndex); + } + + @Override + public void fill(int index, long[] toFill) + { + if (totalSize - index < toFill.length) { + throw new IndexOutOfBoundsException( + String.format( + "Cannot fill array of size[%,d] at index[%,d]. Max size[%,d]", toFill.length, index, totalSize + ) + ); + } + + int bufferNum = index / sizePer; + int bufferIndex = index % sizePer; + + int leftToFill = toFill.length; + while (leftToFill > 0) { + if (bufferNum != currIndex) { + loadBuffer(bufferNum); + } + + buffer.mark(); + buffer.position(buffer.position() + bufferIndex); + final int numToGet = Math.min(buffer.remaining(), leftToFill); + buffer.get(toFill, toFill.length - leftToFill, numToGet); + buffer.reset(); + leftToFill -= numToGet; + ++bufferNum; + bufferIndex = 0; + } + } + + protected void loadBuffer(int bufferNum) + { + CloseQuietly.close(holder); + holder = baseLongBuffers.get(bufferNum); + buffer = holder.get(); + currIndex = bufferNum; + } + + @Override + public int binarySearch(long key) + { + throw new UnsupportedOperationException(); + } + + @Override + public int binarySearch(long key, int from, int to) + { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() + { + return "CompressedLongsIndexedSupplier_Anonymous{" + + "currIndex=" + currIndex + + ", sizePer=" + sizePer + + ", numChunks=" + baseLongBuffers.size() + + ", totalSize=" + totalSize + + '}'; + } + + @Override + public void close() throws IOException + { + Closeables.close(holder, false); + } + } } diff --git a/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java b/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java index cf04cc44767..b1e3c99f614 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedLongsSupplierSerializer.java @@ -25,6 +25,7 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import io.druid.collections.ResourceHolder; import io.druid.collections.StupidResourceHolder; +import io.druid.segment.CompressedPools; import java.io.IOException; import java.io.OutputStream; @@ -39,11 +40,10 @@ public class CompressedLongsSupplierSerializer IOPeon ioPeon, final String filenameBase, final ByteOrder order, final CompressedObjectStrategy.CompressionStrategy compression ) throws IOException { - final int sizePer = 0xFFFF / Longs.BYTES; final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer( - sizePer, + CompressedLongsIndexedSupplier.MAX_LONGS_IN_BUFFER, new GenericIndexedWriter>( - ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, sizePer) + ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order, compression, CompressedLongsIndexedSupplier.MAX_LONGS_IN_BUFFER) ), compression ); diff --git a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java index 0982f76f8bf..c1a55700b05 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedFloatsIndexedSupplierTest.java @@ -65,15 +65,15 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest Closeables.close(indexed, false); } - private void setupSimple() + private void setupSimple(final int chunkSize) { vals = new float[]{ - 0.0f, 0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f, 0.8f, 0.9f, 0.10f, 0.11f, 0.12f, 0.13f, 0.14f, 0.15f + 0.0f, 0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f, 0.8f, 0.9f, 0.10f, 0.11f, 0.12f, 0.13f, 0.14f, 0.15f, 0.16f }; supplier = CompressedFloatsIndexedSupplier.fromFloatBuffer( FloatBuffer.wrap(vals), - 5, + chunkSize, ByteOrder.nativeOrder(), compressionStrategy ); @@ -81,15 +81,15 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest indexed = supplier.get(); } - private void setupSimpleWithSerde() throws IOException + private void setupSimpleWithSerde(final int chunkSize) throws IOException { vals = new float[]{ - 0.0f, 0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f, 0.8f, 0.9f, 0.10f, 0.11f, 0.12f, 0.13f, 0.14f, 0.15f + 0.0f, 0.1f, 0.2f, 0.3f, 0.4f, 0.5f, 0.6f, 0.7f, 0.8f, 0.9f, 0.10f, 0.11f, 0.12f, 0.13f, 0.14f, 0.15f, 0.16f }; ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CompressedFloatsIndexedSupplier theSupplier = CompressedFloatsIndexedSupplier.fromFloatBuffer( - FloatBuffer.wrap(vals), 5, ByteOrder.nativeOrder(), compressionStrategy + FloatBuffer.wrap(vals), chunkSize, ByteOrder.nativeOrder(), compressionStrategy ); theSupplier.writeToChannel(Channels.newChannel(baos)); @@ -103,7 +103,7 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest @Test public void testSanity() throws Exception { - setupSimple(); + setupSimple(5); Assert.assertEquals(4, supplier.getBaseFloatBuffers().size()); @@ -111,12 +111,23 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest for (int i = 0; i < indexed.size(); ++i) { Assert.assertEquals(vals[i], indexed.get(i), 0.0); } + + // test powers of 2 + setupSimple(2); + + Assert.assertEquals(9, supplier.getBaseFloatBuffers().size()); + + Assert.assertEquals(vals.length, indexed.size()); + for (int i = 0; i < indexed.size(); ++i) { + Assert.assertEquals(vals[i], indexed.get(i), 0.0); + } + } @Test public void testBulkFill() throws Exception { - setupSimple(); + setupSimple(5); tryFill(0, 15); tryFill(3, 6); @@ -127,14 +138,14 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest @Test(expected = IndexOutOfBoundsException.class) public void testBulkFillTooMuch() throws Exception { - setupSimple(); - tryFill(7, 10); + setupSimple(5); + tryFill(7, 11); } @Test public void testSanityWithSerde() throws Exception { - setupSimpleWithSerde(); + setupSimpleWithSerde(5); Assert.assertEquals(4, supplier.getBaseFloatBuffers().size()); @@ -142,12 +153,22 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest for (int i = 0; i < indexed.size(); ++i) { Assert.assertEquals(vals[i], indexed.get(i), 0.0); } + + // test powers of 2 + setupSimpleWithSerde(2); + + Assert.assertEquals(9, supplier.getBaseFloatBuffers().size()); + + Assert.assertEquals(vals.length, indexed.size()); + for (int i = 0; i < indexed.size(); ++i) { + Assert.assertEquals(vals[i], indexed.get(i), 0.0); + } } @Test public void testBulkFillWithSerde() throws Exception { - setupSimpleWithSerde(); + setupSimpleWithSerde(5); tryFill(0, 15); tryFill(3, 6); @@ -158,8 +179,8 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest @Test(expected = IndexOutOfBoundsException.class) public void testBulkFillTooMuchWithSerde() throws Exception { - setupSimpleWithSerde(); - tryFill(7, 10); + setupSimpleWithSerde(5); + tryFill(7, 11); } // This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it, @@ -167,7 +188,7 @@ public class CompressedFloatsIndexedSupplierTest extends CompressionStrategyTest @Test public void testConcurrentThreadReads() throws Exception { - setupSimple(); + setupSimple(5); final AtomicReference reason = new AtomicReference("none");