diff --git a/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java new file mode 100644 index 00000000000..657064bf462 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/CompressedVSizeIndexedV3Supplier.java @@ -0,0 +1,138 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import com.metamx.common.IAE; +import io.druid.segment.data.CompressedIntsIndexedSupplier; +import io.druid.segment.data.CompressedObjectStrategy; +import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.data.IndexedMultivalue; +import io.druid.segment.data.WritableSupplier; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * The format is mostly the same with CompressedVSizeIndexedSupplier(which has version 0x2, so we call it V2), + * the only difference is V3's offsets is not VSize encoded, it's just compressed. + * The reason we provide this is we can streams the data out in the binary format with CompressedVSizeIndexedV3Writer. + * If we want to streams VSizeInts, we must know the max value in the value sets. It's easy to know the max id of + * values(like dimension cardinality while encoding dimension), but difficult to known the max id of offsets. + */ +public class CompressedVSizeIndexedV3Supplier implements WritableSupplier> +{ + public static final byte version = 0x3; + + private final CompressedIntsIndexedSupplier offsetSupplier; + private final CompressedVSizeIntsIndexedSupplier valueSupplier; + + CompressedVSizeIndexedV3Supplier( + CompressedIntsIndexedSupplier offsetSupplier, + CompressedVSizeIntsIndexedSupplier valueSupplier + ) + { + this.offsetSupplier = offsetSupplier; + this.valueSupplier = valueSupplier; + } + + public static CompressedVSizeIndexedV3Supplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + { + byte versionFromBuffer = buffer.get(); + + if (versionFromBuffer == version) { + CompressedIntsIndexedSupplier offsetSupplier = CompressedIntsIndexedSupplier.fromByteBuffer( + buffer, + order + ); + CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer( + buffer, + order + ); + return new CompressedVSizeIndexedV3Supplier(offsetSupplier, valueSupplier); + } + throw new IAE("Unknown version[%s]", versionFromBuffer); + } + + // for test + public static CompressedVSizeIndexedV3Supplier fromIterable( + Iterable objectsIterable, + int offsetChunkFactor, + int maxValue, + final ByteOrder byteOrder, + CompressedObjectStrategy.CompressionStrategy compression + ) + { + Iterator objects = objectsIterable.iterator(); + List offsetList = new ArrayList<>(); + List values = new ArrayList<>(); + + int offset = 0; + while (objects.hasNext()) { + IndexedInts next = objects.next(); + offsetList.add(offset); + for (int i = 0; i < next.size(); i++) { + values.add(next.get(i)); + } + offset += next.size(); + } + offsetList.add(offset); + CompressedIntsIndexedSupplier headerSupplier = CompressedIntsIndexedSupplier.fromList( + offsetList, + offsetChunkFactor, + byteOrder, + compression + ); + CompressedVSizeIntsIndexedSupplier valuesSupplier = CompressedVSizeIntsIndexedSupplier.fromList( + values, + maxValue, + CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue), + byteOrder, + compression + ); + return new CompressedVSizeIndexedV3Supplier(headerSupplier, valuesSupplier); + } + + @Override + public long getSerializedSize() + { + return 1 + offsetSupplier.getSerializedSize() + valueSupplier.getSerializedSize(); + } + + @Override + public void writeToChannel(WritableByteChannel channel) throws IOException + { + channel.write(ByteBuffer.wrap(new byte[]{version})); + offsetSupplier.writeToChannel(channel); + valueSupplier.writeToChannel(channel); + } + + @Override + public IndexedMultivalue get() + { + return new CompressedVSizeIndexedSupplier.CompressedVSizeIndexed(offsetSupplier.get(), valueSupplier.get()); + } + +} diff --git a/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedWriter.java new file mode 100644 index 00000000000..d12a2c8434d --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/CompressedIntsIndexedWriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidResourceHolder; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; + +/** + * Streams array of integers out in the binary format described by CompressedIntsIndexedSupplier + */ +public class CompressedIntsIndexedWriter +{ + public static final byte version = CompressedIntsIndexedSupplier.version; + + private final int chunkFactor; + private final ByteOrder byteOrder; + private final CompressedObjectStrategy.CompressionStrategy compression; + private final GenericIndexedWriter> flattener; + private IntBuffer endBuffer; + private int numInserted; + + public CompressedIntsIndexedWriter( + final IOPeon ioPeon, + final String filenameBase, + final int chunkFactor, + final ByteOrder byteOrder, + final CompressedObjectStrategy.CompressionStrategy compression + ) + { + this.chunkFactor = chunkFactor; + this.byteOrder = byteOrder; + this.compression = compression; + this.flattener = new GenericIndexedWriter<>( + ioPeon, filenameBase, CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor) + ); + this.endBuffer = IntBuffer.allocate(chunkFactor); + this.numInserted = 0; + } + + public void open() throws IOException + { + flattener.open(); + } + + public void add(int val) throws IOException + { + if (!endBuffer.hasRemaining()) { + endBuffer.rewind(); + flattener.write(StupidResourceHolder.create(endBuffer)); + endBuffer.rewind(); + } + endBuffer.put(val); + numInserted++; + } + + public long closeAndWriteToChannel(WritableByteChannel channel) throws IOException + { + if (numInserted > 0) { + endBuffer.limit(endBuffer.position()); + endBuffer.rewind(); + flattener.write(StupidResourceHolder.create(endBuffer)); + } + endBuffer = null; + flattener.close(); + + channel.write(ByteBuffer.wrap(new byte[]{version})); + channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted))); + channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor))); + channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); + final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput()); + long dataLen = ByteStreams.copy(from, channel); + return 1 + // version + Ints.BYTES + // numInserted + Ints.BYTES + // chunkFactor + 1 + // compression id + dataLen; // data + } +} diff --git a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIndexedV3Writer.java b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIndexedV3Writer.java new file mode 100644 index 00000000000..8b39d44eebb --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIndexedV3Writer.java @@ -0,0 +1,72 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Streams array of integers out in the binary format described by CompressedVSizeIndexedV3Supplier + */ +package io.druid.segment.data; + +import io.druid.segment.CompressedVSizeIndexedV3Supplier; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +public class CompressedVSizeIndexedV3Writer +{ + public static final byte version = CompressedVSizeIndexedV3Supplier.version; + + private final CompressedIntsIndexedWriter offsetWriter; + private final CompressedVSizeIntsIndexedWriter valueWriter; + private int offset; + + public CompressedVSizeIndexedV3Writer( + CompressedIntsIndexedWriter offsetWriter, + CompressedVSizeIntsIndexedWriter valueWriter + ) + { + this.offsetWriter = offsetWriter; + this.valueWriter = valueWriter; + this.offset = 0; + } + + public void open() throws IOException + { + offsetWriter.open(); + valueWriter.open(); + } + + public void add(int[] vals) throws IOException + { + offsetWriter.add(offset); + for (int val : vals) { + valueWriter.add(val); + } + offset += vals.length; + } + + public long closeAndWriteToChannel(WritableByteChannel channel) throws IOException + { + channel.write(ByteBuffer.wrap(new byte[]{version})); + offsetWriter.add(offset); + long offsetLen = offsetWriter.closeAndWriteToChannel(channel); + long dataLen = valueWriter.closeAndWriteToChannel(channel); + return 1 + offsetLen + dataLen; + } +} diff --git a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java index 20aed86992f..0b1a34b70d1 100644 --- a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java +++ b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java @@ -79,7 +79,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier> flattener; + private final ByteBuffer intBuffer; + private ByteBuffer endBuffer; + private int numInserted; + + public CompressedVSizeIntsIndexedWriter( + final IOPeon ioPeon, + final String filenameBase, + final int maxValue, + final int chunkFactor, + final ByteOrder byteOrder, + final CompressedObjectStrategy.CompressionStrategy compression + ) + { + this.numBytes = VSizeIndexedInts.getNumBytesForMax(maxValue); + this.chunkFactor = chunkFactor; + this.chunkBytes = chunkFactor * numBytes + CompressedVSizeIntsIndexedSupplier.bufferPadding(numBytes); + this.byteOrder = byteOrder; + this.compression = compression; + this.flattener = new GenericIndexedWriter<>( + ioPeon, filenameBase, CompressedByteBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkBytes) + ); + this.intBuffer = ByteBuffer.allocate(Ints.BYTES).order(byteOrder); + this.endBuffer = ByteBuffer.allocate(chunkBytes).order(byteOrder); + this.endBuffer.limit(numBytes * chunkFactor); + this.numInserted = 0; + } + + public void open() throws IOException + { + flattener.open(); + } + + public void add(int val) throws IOException + { + if (!endBuffer.hasRemaining()) { + endBuffer.rewind(); + flattener.write(StupidResourceHolder.create(endBuffer)); + endBuffer.rewind(); + endBuffer.limit(numBytes * chunkFactor); + } + intBuffer.putInt(0, val); + if (byteOrder.equals(ByteOrder.BIG_ENDIAN)) { + endBuffer.put(intBuffer.array(), Ints.BYTES - numBytes, numBytes); + } else { + endBuffer.put(intBuffer.array(), 0, numBytes); + } + numInserted++; + } + + public long closeAndWriteToChannel(WritableByteChannel channel) throws IOException + { + if (numInserted > 0) { + endBuffer.limit(endBuffer.position()); + endBuffer.rewind(); + flattener.write(StupidResourceHolder.create(endBuffer)); + } + endBuffer = null; + flattener.close(); + + channel.write(ByteBuffer.wrap(new byte[]{version, (byte) numBytes})); + channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted))); + channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor))); + channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); + final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput()); + long dataLen = ByteStreams.copy(from, channel); + return 1 + // version + 1 + // numBytes + Ints.BYTES + // numInserted + Ints.BYTES + // chunkFactor + 1 + // compression id + dataLen; // data + } +} diff --git a/processing/src/test/java/io/druid/segment/CompressedVSizeIndexedV3SupplierTest.java b/processing/src/test/java/io/druid/segment/CompressedVSizeIndexedV3SupplierTest.java new file mode 100644 index 00000000000..557d01c4c1c --- /dev/null +++ b/processing/src/test/java/io/druid/segment/CompressedVSizeIndexedV3SupplierTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import io.druid.segment.data.CompressedObjectStrategy; +import io.druid.segment.data.CompressedVSizeIndexedSupplierTest; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.data.IndexedMultivalue; +import io.druid.segment.data.VSizeIndexedInts; +import io.druid.segment.data.WritableSupplier; +import org.junit.After; +import org.junit.Before; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +public class CompressedVSizeIndexedV3SupplierTest extends CompressedVSizeIndexedSupplierTest +{ + @Before + public void setUpSimple(){ + vals = Arrays.asList( + new int[1], + new int[]{1, 2, 3, 4, 5}, + new int[]{6, 7, 8, 9, 10}, + new int[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20} + ); + + indexedSupplier = CompressedVSizeIndexedV3Supplier.fromIterable( + Iterables.transform( + vals, + new Function() + { + @Override + public IndexedInts apply(int[] input) + { + return VSizeIndexedInts.fromArray(input, 20); + } + } + ), 2, 20, ByteOrder.nativeOrder(), + CompressedObjectStrategy.CompressionStrategy.LZ4 + ); + } + + @After + public void teardown(){ + indexedSupplier = null; + vals = null; + } + + @Override + protected WritableSupplier> fromByteBuffer(ByteBuffer buffer, ByteOrder order) + { + return CompressedVSizeIndexedV3Supplier.fromByteBuffer( + buffer, ByteOrder.nativeOrder() + ); + } +} \ No newline at end of file diff --git a/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedWriterTest.java b/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedWriterTest.java new file mode 100644 index 00000000000..6cbfdb0e6e6 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/data/CompressedIntsIndexedWriterTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.metamx.common.guava.CloseQuietly; +import org.apache.commons.io.IOUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class CompressedIntsIndexedWriterTest +{ + @Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}") + public static Iterable compressionStrategiesAndByteOrders() + { + Set> combinations = Sets.cartesianProduct( + Sets.newHashSet(CompressedObjectStrategy.CompressionStrategy.values()), + Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN) + ); + + return Iterables.transform( + combinations, new Function() + { + @Override + public Object[] apply(List input) + { + return new Object[]{input.get(0), input.get(1)}; + } + } + ); + } + + private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; + private static final int[] CHUNK_FACTORS = new int[]{1, 2, 100, CompressedIntsIndexedSupplier.MAX_INTS_IN_BUFFER}; + + private final IOPeon ioPeon = new TmpFileIOPeon(); + private final CompressedObjectStrategy.CompressionStrategy compressionStrategy; + private final ByteOrder byteOrder; + private final Random rand = new Random(0); + private int[] vals; + + public CompressedIntsIndexedWriterTest( + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + ByteOrder byteOrder + ) + { + this.compressionStrategy = compressionStrategy; + this.byteOrder = byteOrder; + } + + @Before + public void setUp() throws Exception + { + vals = null; + } + + @After + public void tearDown() throws Exception + { + ioPeon.cleanup(); + } + + private void generateVals(final int totalSize, final int maxValue) throws IOException + { + vals = new int[totalSize]; + for (int i = 0; i < vals.length; ++i) { + vals[i] = rand.nextInt(maxValue); + } + } + + private void checkSerializedSizeAndData(int chunkFactor) throws Exception + { + CompressedIntsIndexedWriter writer = new CompressedIntsIndexedWriter( + ioPeon, "test", chunkFactor, byteOrder, compressionStrategy + ); + CompressedIntsIndexedSupplier supplierFromList = CompressedIntsIndexedSupplier.fromList( + Ints.asList(vals), chunkFactor, byteOrder, compressionStrategy + ); + writer.open(); + for (int val : vals) { + writer.add(val); + } + final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output")); + long writtenLength = writer.closeAndWriteToChannel(outputChannel); + outputChannel.close(); + + assertEquals(writtenLength, supplierFromList.getSerializedSize()); + + // read from ByteBuffer and check values + CompressedIntsIndexedSupplier supplierFromByteBuffer = CompressedIntsIndexedSupplier.fromByteBuffer( + ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder + ); + IndexedInts indexedInts = supplierFromByteBuffer.get(); + assertEquals(vals.length, indexedInts.size()); + for (int i = 0; i < vals.length; ++i) { + assertEquals(vals[i], indexedInts.get(i)); + } + CloseQuietly.close(indexedInts); + } + + @Test + public void testSmallData() throws Exception + { + // less than one chunk + for (int maxValue : MAX_VALUES) { + for (int chunkFactor : CHUNK_FACTORS) { + generateVals(rand.nextInt(chunkFactor), maxValue); + checkSerializedSizeAndData(chunkFactor); + } + } + } + + @Test + public void testLargeData() throws Exception + { + // more than one chunk + for (int maxValue : MAX_VALUES) { + for (int chunkFactor : CHUNK_FACTORS) { + generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue); + checkSerializedSizeAndData(chunkFactor); + } + } + } + + @Test + public void testWriteEmpty() throws Exception + { + vals = new int[0]; + checkSerializedSizeAndData(2); + } +} \ No newline at end of file diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java index eaf98217281..1b52dd75707 100644 --- a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java +++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedSupplierTest.java @@ -40,12 +40,13 @@ import java.util.List; */ public class CompressedVSizeIndexedSupplierTest { - private List vals; + protected List vals; - private CompressedVSizeIndexedSupplier indexedSupplier; + protected WritableSupplier> indexedSupplier; @Before - public void setUpSimple(){ + public void setUpSimple() + { vals = Arrays.asList( new int[1], new int[]{1, 2, 3, 4, 5}, @@ -70,7 +71,8 @@ public class CompressedVSizeIndexedSupplierTest } @After - public void teardown(){ + public void teardown() + { indexedSupplier = null; vals = null; } @@ -89,7 +91,7 @@ public class CompressedVSizeIndexedSupplierTest final byte[] bytes = baos.toByteArray(); Assert.assertEquals(indexedSupplier.getSerializedSize(), bytes.length); - CompressedVSizeIndexedSupplier deserializedIndexed = CompressedVSizeIndexedSupplier.fromByteBuffer( + WritableSupplier> deserializedIndexed = fromByteBuffer( ByteBuffer.wrap(bytes), ByteOrder.nativeOrder() ); @@ -98,26 +100,28 @@ public class CompressedVSizeIndexedSupplierTest } @Test(expected = IllegalArgumentException.class) - public void testGetInvalidElementInRow(){ + public void testGetInvalidElementInRow() + { indexedSupplier.get().get(3).get(15); } @Test - public void testIterators(){ + public void testIterators() + { Iterator iterator = indexedSupplier.get().iterator(); int row = 0; - while(iterator.hasNext()){ + while (iterator.hasNext()) { final int[] ints = vals.get(row); final IndexedInts vSizeIndexedInts = iterator.next(); Assert.assertEquals(ints.length, vSizeIndexedInts.size()); Iterator valsIterator = vSizeIndexedInts.iterator(); - int j=0; - while(valsIterator.hasNext()){ - Assert.assertEquals((Integer)ints[j], valsIterator.next()); + int j = 0; + while (valsIterator.hasNext()) { + Assert.assertEquals((Integer) ints[j], valsIterator.next()); j++; } - row ++; + row++; } } @@ -134,4 +138,11 @@ public class CompressedVSizeIndexedSupplierTest } } } + + protected WritableSupplier> fromByteBuffer(ByteBuffer buffer, ByteOrder order) + { + return CompressedVSizeIndexedSupplier.fromByteBuffer( + buffer, ByteOrder.nativeOrder() + ); + } } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedV3WriterTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedV3WriterTest.java new file mode 100644 index 00000000000..414468d510e --- /dev/null +++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIndexedV3WriterTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.metamx.common.guava.CloseQuietly; +import io.druid.segment.CompressedVSizeIndexedV3Supplier; +import org.apache.commons.io.IOUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class CompressedVSizeIndexedV3WriterTest +{ + @Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}") + public static Iterable compressionStrategiesAndByteOrders() + { + Set> combinations = Sets.cartesianProduct( + Sets.newHashSet(CompressedObjectStrategy.CompressionStrategy.values()), + Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN) + ); + + return Iterables.transform( + combinations, new Function() + { + @Override + public Object[] apply(List input) + { + return new Object[]{input.get(0), input.get(1)}; + } + } + ); + } + + private static final int[] OFFSET_CHUNK_FACTORS = new int[]{ + 1, + 2, + 100, + CompressedIntsIndexedSupplier.MAX_INTS_IN_BUFFER + }; + private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; + + private final IOPeon ioPeon = new TmpFileIOPeon(); + private final CompressedObjectStrategy.CompressionStrategy compressionStrategy; + private final ByteOrder byteOrder; + private final Random rand = new Random(0); + private List vals; + + public CompressedVSizeIndexedV3WriterTest( + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + ByteOrder byteOrder + ) + { + this.compressionStrategy = compressionStrategy; + this.byteOrder = byteOrder; + } + + private void generateVals(final int totalSize, final int maxValue) throws IOException + { + vals = new ArrayList<>(totalSize); + for (int i = 0; i < totalSize; ++i) { + int len = rand.nextInt(2) + 1; + int[] subVals = new int[len]; + for (int j = 0; j < len; ++j) { + subVals[j] = rand.nextInt(maxValue); + } + vals.add(subVals); + } + } + + private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception + { + int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0; + CompressedIntsIndexedWriter offsetWriter = new CompressedIntsIndexedWriter( + ioPeon, "offset", offsetChunkFactor, byteOrder, compressionStrategy + ); + CompressedVSizeIntsIndexedWriter valueWriter = new CompressedVSizeIntsIndexedWriter( + ioPeon, "value", maxValue, valueChunkFactor, byteOrder, compressionStrategy + ); + CompressedVSizeIndexedV3Writer writer = new CompressedVSizeIndexedV3Writer(offsetWriter, valueWriter); + CompressedVSizeIndexedV3Supplier supplierFromIterable = CompressedVSizeIndexedV3Supplier.fromIterable( + Iterables.transform( + vals, new Function() + { + @Nullable + @Override + public IndexedInts apply(@Nullable final int[] input) + { + return new ArrayBasedIndexedInts(input); + } + } + ), offsetChunkFactor, maxValue, byteOrder, compressionStrategy + ); + writer.open(); + for (int[] val : vals) { + writer.add(val); + } + + final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output")); + long writtenLength = writer.closeAndWriteToChannel(outputChannel); + outputChannel.close(); + + assertEquals(writtenLength, supplierFromIterable.getSerializedSize()); + + // read from ByteBuffer and check values + CompressedVSizeIndexedV3Supplier supplierFromByteBuffer = CompressedVSizeIndexedV3Supplier.fromByteBuffer( + ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder + ); + IndexedMultivalue indexedMultivalue = supplierFromByteBuffer.get(); + assertEquals(indexedMultivalue.size(), vals.size()); + for (int i = 0; i < vals.size(); ++i) { + IndexedInts subVals = indexedMultivalue.get(i); + assertEquals(subVals.size(), vals.get(i).length); + for (int j = 0; j < subVals.size(); ++j) { + assertEquals(subVals.get(j), vals.get(i)[j]); + } + } + CloseQuietly.close(indexedMultivalue); + } + + int getMaxValue(final List vals) + { + return Ordering.natural().max( + Iterables.transform( + vals, new Function() + { + @Nullable + @Override + public Integer apply(int[] input) + { + return input.length > 0 ? Ints.max(input) : 0; + } + } + ) + ); + } + + @Before + public void setUp() throws Exception + { + vals = null; + } + + @After + public void tearDown() throws Exception + { + ioPeon.cleanup(); + } + + @Test + public void testSmallData() throws Exception + { + // less than one chunk + for (int offsetChunk : OFFSET_CHUNK_FACTORS) { + for (int maxValue : MAX_VALUES) { + final int valueChunk = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue); + generateVals(rand.nextInt(valueChunk), maxValue); + checkSerializedSizeAndData(offsetChunk, valueChunk); + } + } + } + + @Test + public void testLargeData() throws Exception + { + // more than one chunk + for (int offsetChunk : OFFSET_CHUNK_FACTORS) { + for (int maxValue : MAX_VALUES) { + final int valueChunk = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue); + generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue); + checkSerializedSizeAndData(offsetChunk, valueChunk); + } + } + } + + @Test + public void testEmpty() throws Exception + { + vals = new ArrayList<>(); + checkSerializedSizeAndData(1, 2); + } +} \ No newline at end of file diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriterTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriterTest.java new file mode 100644 index 00000000000..5863428ef4b --- /dev/null +++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedWriterTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.metamx.common.guava.CloseQuietly; +import org.apache.commons.io.IOUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class CompressedVSizeIntsIndexedWriterTest +{ + @Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}") + public static Iterable compressionStrategiesAndByteOrders() + { + Set> combinations = Sets.cartesianProduct( + Sets.newHashSet(CompressedObjectStrategy.CompressionStrategy.values()), + Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN) + ); + + return Iterables.transform( + combinations, new Function() + { + @Override + public Object[] apply(List input) + { + return new Object[]{input.get(0), input.get(1)}; + } + } + ); + } + + private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF}; + + private final IOPeon ioPeon = new TmpFileIOPeon(); + private final CompressedObjectStrategy.CompressionStrategy compressionStrategy; + private final ByteOrder byteOrder; + private final Random rand = new Random(0); + private int[] vals; + + public CompressedVSizeIntsIndexedWriterTest( + CompressedObjectStrategy.CompressionStrategy compressionStrategy, + ByteOrder byteOrder + ) + { + this.compressionStrategy = compressionStrategy; + this.byteOrder = byteOrder; + } + + @Before + public void setUp() throws Exception + { + vals = null; + } + + @After + public void tearDown() throws Exception + { + ioPeon.cleanup(); + } + + private void generateVals(final int totalSize, final int maxValue) throws IOException + { + vals = new int[totalSize]; + for (int i = 0; i < vals.length; ++i) { + vals[i] = rand.nextInt(maxValue); + } + } + + private void checkSerializedSizeAndData(int chunkSize) throws Exception + { + CompressedVSizeIntsIndexedWriter writer = new CompressedVSizeIntsIndexedWriter( + ioPeon, "test", vals.length > 0 ? Ints.max(vals) : 0, chunkSize, byteOrder, compressionStrategy + ); + CompressedVSizeIntsIndexedSupplier supplierFromList = CompressedVSizeIntsIndexedSupplier.fromList( + Ints.asList(vals), vals.length > 0 ? Ints.max(vals) : 0, chunkSize, byteOrder, compressionStrategy + ); + writer.open(); + for (int val : vals) { + writer.add(val); + } + final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output")); + long writtenLength = writer.closeAndWriteToChannel(outputChannel); + outputChannel.close(); + + assertEquals(writtenLength, supplierFromList.getSerializedSize()); + + // read from ByteBuffer and check values + CompressedVSizeIntsIndexedSupplier supplierFromByteBuffer = CompressedVSizeIntsIndexedSupplier.fromByteBuffer( + ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder + ); + IndexedInts indexedInts = supplierFromByteBuffer.get(); + for (int i = 0; i < vals.length; ++i) { + assertEquals(vals[i], indexedInts.get(i)); + } + CloseQuietly.close(indexedInts); + } + + @Test + public void testSmallData() throws Exception + { + // less than one chunk + for (int maxValue : MAX_VALUES) { + final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue); + generateVals(rand.nextInt(maxChunkSize), maxValue); + checkSerializedSizeAndData(maxChunkSize); + } + } + + @Test + public void testLargeData() throws Exception + { + // more than one chunk + for (int maxValue : MAX_VALUES) { + final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue); + generateVals((rand.nextInt(5) + 5) * maxChunkSize + rand.nextInt(maxChunkSize), maxValue); + checkSerializedSizeAndData(maxChunkSize); + } + } + + @Test + public void testEmpty() throws Exception + { + vals = new int[0]; + checkSerializedSizeAndData(2); + } +} \ No newline at end of file