From ce928d9636f73e6e0890031e609becd0e2df0f02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 13 Apr 2015 21:47:22 -0700 Subject: [PATCH] add compressed ints column type --- .../CompressedIntBufferObjectStrategy.java | 72 ++++ .../data/CompressedIntsIndexedSupplier.java | 357 ++++++++++++++++++ .../CompressedIntsIndexedSupplierTest.java | 341 +++++++++++++++++ 3 files changed, 770 insertions(+) create mode 100644 processing/src/main/java/io/druid/segment/data/CompressedIntBufferObjectStrategy.java create mode 100644 processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java create mode 100644 processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedSupplierTest.java diff --git a/processing/src/main/java/io/druid/segment/data/CompressedIntBufferObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedIntBufferObjectStrategy.java new file mode 100644 index 00000000000..7f621cab9f0 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/CompressedIntBufferObjectStrategy.java @@ -0,0 +1,72 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; + +public class CompressedIntBufferObjectStrategy extends FixedSizeCompressedObjectStrategy +{ + public static final Ordering ORDERING = Ordering.natural().nullsFirst(); + + public static CompressedIntBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer) + { + return new CompressedIntBufferObjectStrategy(order, compression, sizePer); + } + + private CompressedIntBufferObjectStrategy(final ByteOrder order, final CompressionStrategy compression, final int sizePer) + { + super( + order, + new BufferConverter() + { + @Override + public IntBuffer convert(ByteBuffer buf) + { + return buf.asIntBuffer(); + } + + @Override + public int compare(IntBuffer lhs, IntBuffer rhs) + { + return ORDERING.compare(lhs, rhs); + } + + @Override + public int sizeOf(int count) + { + return count * Ints.BYTES; + } + + @Override + public IntBuffer combine(ByteBuffer into, IntBuffer from) + { + return into.asIntBuffer().put(from); + } + }, + compression, + sizePer + ); + } +} diff --git a/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java new file mode 100644 index 00000000000..1f1aa90a35b --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedSupplier.java @@ -0,0 +1,357 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.base.Preconditions; +import com.google.common.io.Closeables; +import com.google.common.primitives.Ints; +import com.metamx.common.IAE; +import com.metamx.common.guava.CloseQuietly; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidResourceHolder; +import io.druid.segment.CompressedPools; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Iterator; +import java.util.List; + +public class CompressedIntsIndexedSupplier implements WritableSupplier +{ + public static final byte version = 0x2; + public static final int MAX_INTS_IN_BUFFER = CompressedPools.BUFFER_SIZE / Ints.BYTES; + + + private final int totalSize; + private final int sizePer; + private final GenericIndexed> baseIntBuffers; + private final CompressedObjectStrategy.CompressionStrategy compression; + + CompressedIntsIndexedSupplier( + int totalSize, + int sizePer, + GenericIndexed> baseIntBuffers, + CompressedObjectStrategy.CompressionStrategy compression + ) + { + this.totalSize = totalSize; + this.sizePer = sizePer; + this.baseIntBuffers = baseIntBuffers; + this.compression = compression; + } + + public int size() + { + return totalSize; + } + + @Override + public IndexedInts get() + { + final int div = Integer.numberOfTrailingZeros(sizePer); + final int rem = sizePer - 1; + final boolean powerOf2 = sizePer == (1 << div); + if(powerOf2) { + return new CompressedIndexedInts() { + @Override + public int get(int index) + { + // optimize division and remainder for powers of 2 + final int bufferNum = index >> div; + + if (bufferNum != currIndex) { + loadBuffer(bufferNum); + } + + final int bufferIndex = index & rem; + return buffer.get(buffer.position() + bufferIndex); + } + }; + } else { + return new CompressedIndexedInts(); + } + } + + public long getSerializedSize() + { + return 1 + // version + 4 + // totalSize + 4 + // sizePer + 1 + // compressionId + baseIntBuffers.getSerializedSize(); // data + } + + public void writeToChannel(WritableByteChannel channel) throws IOException + { + channel.write(ByteBuffer.wrap(new byte[]{version})); + channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize))); + channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer))); + channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); + baseIntBuffers.writeToChannel(channel); + } + + public CompressedIntsIndexedSupplier convertByteOrder(ByteOrder order) + { + return new CompressedIntsIndexedSupplier( + totalSize, + sizePer, + GenericIndexed.fromIterable(baseIntBuffers, CompressedIntBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)), + compression + ); + } + + /** + * For testing. Do not use unless you like things breaking + */ + GenericIndexed> getBaseIntBuffers() + { + return baseIntBuffers; + } + + public static CompressedIntsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + { + byte versionFromBuffer = buffer.get(); + + if (versionFromBuffer == version) { + final int totalSize = buffer.getInt(); + final int sizePer = buffer.getInt(); + final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get()); + return new CompressedIntsIndexedSupplier( + totalSize, + sizePer, + GenericIndexed.read(buffer, CompressedIntBufferObjectStrategy.getBufferForOrder(order, compression, sizePer)), + compression + ); + } + + throw new IAE("Unknown version[%s]", versionFromBuffer); + } + + public static CompressedIntsIndexedSupplier fromIntBuffer(IntBuffer buffer, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression) + { + return fromIntBuffer(buffer, MAX_INTS_IN_BUFFER, byteOrder, compression); + } + + public static CompressedIntsIndexedSupplier fromIntBuffer( + final IntBuffer buffer, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression + ) + { + Preconditions.checkArgument( + chunkFactor <= MAX_INTS_IN_BUFFER, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor + ); + + return new CompressedIntsIndexedSupplier( + buffer.remaining(), + chunkFactor, + GenericIndexed.fromIterable( + new Iterable>() + { + @Override + public Iterator> iterator() + { + return new Iterator>() + { + IntBuffer myBuffer = buffer.asReadOnlyBuffer(); + + @Override + public boolean hasNext() + { + return myBuffer.hasRemaining(); + } + + @Override + public ResourceHolder next() + { + IntBuffer retVal = myBuffer.asReadOnlyBuffer(); + + if (chunkFactor < myBuffer.remaining()) { + retVal.limit(retVal.position() + chunkFactor); + } + myBuffer.position(myBuffer.position() + retVal.remaining()); + + return StupidResourceHolder.create(retVal); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + }, + CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor) + ), + compression + ); + } + + public static CompressedIntsIndexedSupplier fromList( + final List list , final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression + ) + { + Preconditions.checkArgument( + chunkFactor <= MAX_INTS_IN_BUFFER, "Chunks must be <= 64k bytes. chunkFactor was[%s]", chunkFactor + ); + + return new CompressedIntsIndexedSupplier( + list.size(), + chunkFactor, + GenericIndexed.fromIterable( + new Iterable>() + { + @Override + public Iterator> iterator() + { + return new Iterator>() + { + int position = 0; + + @Override + public boolean hasNext() + { + return position < list.size(); + } + + @Override + public ResourceHolder next() + { + IntBuffer retVal = IntBuffer.allocate(chunkFactor); + + if (chunkFactor > list.size() - position) { + retVal.limit(list.size() - position); + } + final List ints = list.subList(position, position + retVal.remaining()); + for(int value : ints) { + retVal.put(value); + } + retVal.rewind(); + position += retVal.remaining(); + + return StupidResourceHolder.create(retVal); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + }, + CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor) + ), + compression + ); + } + + private class CompressedIndexedInts implements IndexedInts + { + final Indexed> singleThreadedIntBuffers = baseIntBuffers.singleThreaded(); + + int currIndex = -1; + ResourceHolder holder; + IntBuffer buffer; + + @Override + public int size() + { + return totalSize; + } + + @Override + public int get(int index) + { + final int bufferNum = index / sizePer; + final int bufferIndex = index % sizePer; + + if (bufferNum != currIndex) { + loadBuffer(bufferNum); + } + + return buffer.get(buffer.position() + bufferIndex); + } + + @Override + public Iterator iterator() + { + return new IndexedIntsIterator(this); + } + + @Override + public void fill(int index, int[] toFill) + { + if (totalSize - index < toFill.length) { + throw new IndexOutOfBoundsException( + String.format( + "Cannot fill array of size[%,d] at index[%,d]. Max size[%,d]", toFill.length, index, totalSize + ) + ); + } + + int bufferNum = index / sizePer; + int bufferIndex = index % sizePer; + + int leftToFill = toFill.length; + while (leftToFill > 0) { + if (bufferNum != currIndex) { + loadBuffer(bufferNum); + } + + buffer.mark(); + buffer.position(buffer.position() + bufferIndex); + final int numToGet = Math.min(buffer.remaining(), leftToFill); + buffer.get(toFill, toFill.length - leftToFill, numToGet); + buffer.reset(); + leftToFill -= numToGet; + ++bufferNum; + bufferIndex = 0; + } + } + + protected void loadBuffer(int bufferNum) + { + CloseQuietly.close(holder); + holder = singleThreadedIntBuffers.get(bufferNum); + buffer = holder.get(); + currIndex = bufferNum; + } + + @Override + public String toString() + { + return "CompressedIntsIndexedSupplier_Anonymous{" + + "currIndex=" + currIndex + + ", sizePer=" + sizePer + + ", numChunks=" + singleThreadedIntBuffers.size() + + ", totalSize=" + totalSize + + '}'; + } + + @Override + public void close() throws IOException + { + Closeables.close(holder, false); + } + } +} diff --git a/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedSupplierTest.java new file mode 100644 index 00000000000..8eb1663a775 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedSupplierTest.java @@ -0,0 +1,341 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.metamx.common.guava.CloseQuietly; +import io.druid.segment.CompressedPools; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; +import java.nio.channels.Channels; +import java.util.Arrays; +import java.util.Collections; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +public class CompressedIntsIndexedSupplierTest extends CompressionStrategyTest +{ + public CompressedIntsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy) + { + super(compressionStrategy); + } + + private IndexedInts indexed; + private CompressedIntsIndexedSupplier supplier; + private int[] vals; + + @Before + public void setUp() throws Exception + { + CloseQuietly.close(indexed); + indexed = null; + supplier = null; + vals = null; + } + + @After + public void tearDown() throws Exception + { + CloseQuietly.close(indexed); + } + + private void setupSimple(final int chunkSize) + { + CloseQuietly.close(indexed); + + vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; + + supplier = CompressedIntsIndexedSupplier.fromIntBuffer( + IntBuffer.wrap(vals), + chunkSize, + ByteOrder.nativeOrder(), + compressionStrategy + ); + + indexed = supplier.get(); + } + + private void setupSimpleWithSerde(final int chunkSize) throws IOException + { + vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; + + makeWithSerde(chunkSize); + } + + private void makeWithSerde(final int chunkSize) throws IOException + { + CloseQuietly.close(indexed); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final CompressedIntsIndexedSupplier theSupplier = CompressedIntsIndexedSupplier.fromIntBuffer( + IntBuffer.wrap(vals), chunkSize, ByteOrder.nativeOrder(), compressionStrategy + ); + theSupplier.writeToChannel(Channels.newChannel(baos)); + + final byte[] bytes = baos.toByteArray(); + Assert.assertEquals(theSupplier.getSerializedSize(), bytes.length); + + supplier = CompressedIntsIndexedSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), ByteOrder.nativeOrder()); + indexed = supplier.get(); + } + + private void setupLargeChunks(final int chunkSize, final int totalSize) throws IOException + { + vals = new int[totalSize]; + Random rand = new Random(0); + for(int i = 0; i < vals.length; ++i) { + vals[i] = rand.nextInt(); + } + + makeWithSerde(chunkSize); + } + + @Test + public void testSanity() throws Exception + { + setupSimple(5); + + Assert.assertEquals(4, supplier.getBaseIntBuffers().size()); + assertIndexMatchesVals(); + + // test powers of 2 + setupSimple(4); + Assert.assertEquals(4, supplier.getBaseIntBuffers().size()); + assertIndexMatchesVals(); + + setupSimple(32); + Assert.assertEquals(1, supplier.getBaseIntBuffers().size()); + assertIndexMatchesVals(); + } + + @Test + public void testLargeChunks() throws Exception + { + final int maxChunkSize = CompressedPools.BUFFER_SIZE / Longs.BYTES; + + setupLargeChunks(maxChunkSize, 10 * maxChunkSize); + Assert.assertEquals(10, supplier.getBaseIntBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(maxChunkSize, 10 * maxChunkSize + 1); + Assert.assertEquals(11, supplier.getBaseIntBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(maxChunkSize - 1, 10 * (maxChunkSize - 1) + 1); + Assert.assertEquals(11, supplier.getBaseIntBuffers().size()); + assertIndexMatchesVals(); + } + + @Test(expected = IllegalArgumentException.class) + public void testChunkTooBig() throws Exception + { + final int maxChunkSize = CompressedPools.BUFFER_SIZE / Ints.BYTES; + setupLargeChunks(maxChunkSize + 1, 10 * (maxChunkSize + 1)); + } + + @Test + public void testBulkFill() throws Exception + { + setupSimple(5); + + tryFill(0, 15); + tryFill(3, 6); + tryFill(7, 7); + tryFill(7, 9); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testBulkFillTooMuch() throws Exception + { + setupSimple(5); + tryFill(7, 10); + } + + @Test + public void testSanityWithSerde() throws Exception + { + setupSimpleWithSerde(5); + + Assert.assertEquals(4, supplier.getBaseIntBuffers().size()); + assertIndexMatchesVals(); + } + + @Test + public void testBulkFillWithSerde() throws Exception + { + setupSimpleWithSerde(5); + + tryFill(0, 15); + tryFill(3, 6); + tryFill(7, 7); + tryFill(7, 9); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testBulkFillTooMuchWithSerde() throws Exception + { + setupSimpleWithSerde(5); + tryFill(7, 10); + } + + // This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it, + // which sucks but I can't think of a way to deterministically cause it... + @Test + public void testConcurrentThreadReads() throws Exception + { + setupSimple(5); + + final AtomicReference reason = new AtomicReference("none"); + + final int numRuns = 1000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch stopLatch = new CountDownLatch(2); + final AtomicBoolean failureHappened = new AtomicBoolean(false); + new Thread(new Runnable() + { + @Override + public void run() + { + try { + startLatch.await(); + } + catch (InterruptedException e) { + failureHappened.set(true); + reason.set("interrupt."); + stopLatch.countDown(); + return; + } + + try { + for (int i = 0; i < numRuns; ++i) { + for (int j = 0; j < indexed.size(); ++j) { + final long val = vals[j]; + final long indexedVal = indexed.get(j); + if (Longs.compare(val, indexedVal) != 0) { + failureHappened.set(true); + reason.set(String.format("Thread1[%d]: %d != %d", j, val, indexedVal)); + stopLatch.countDown(); + return; + } + } + } + } + catch (Exception e) { + e.printStackTrace(); + failureHappened.set(true); + reason.set(e.getMessage()); + } + + stopLatch.countDown(); + } + }).start(); + + final IndexedInts indexed2 = supplier.get(); + try { + new Thread(new Runnable() + { + @Override + public void run() + { + try { + startLatch.await(); + } + catch (InterruptedException e) { + stopLatch.countDown(); + return; + } + + try { + for (int i = 0; i < numRuns; ++i) { + for (int j = indexed2.size() - 1; j >= 0; --j) { + final long val = vals[j]; + final long indexedVal = indexed2.get(j); + if (Longs.compare(val, indexedVal) != 0) { + failureHappened.set(true); + reason.set(String.format("Thread2[%d]: %d != %d", j, val, indexedVal)); + stopLatch.countDown(); + return; + } + } + } + } + catch (Exception e) { + e.printStackTrace(); + reason.set(e.getMessage()); + failureHappened.set(true); + } + + stopLatch.countDown(); + } + }).start(); + + startLatch.countDown(); + + stopLatch.await(); + } + finally { + CloseQuietly.close(indexed2); + } + + if (failureHappened.get()) { + Assert.fail("Failure happened. Reason: " + reason.get()); + } + } + + private void tryFill(final int startIndex, final int size) + { + int[] filled = new int[size]; + indexed.fill(startIndex, filled); + + for (int i = startIndex; i < filled.length; i++) { + Assert.assertEquals(vals[i + startIndex], filled[i]); + } + } + + private void assertIndexMatchesVals() + { + Assert.assertEquals(vals.length, indexed.size()); + + // sequential access + int[] indices = new int[vals.length]; + for (int i = 0; i < indexed.size(); ++i) { + Assert.assertEquals(vals[i], indexed.get(i), 0.0); + indices[i] = i; + } + + Collections.shuffle(Arrays.asList(indices)); + // random access + for (int i = 0; i < indexed.size(); ++i) { + int k = indices[i]; + Assert.assertEquals(vals[k], indexed.get(k), 0.0); + } + } +}