add some streaming writers

This commit is contained in:
Kurt Young 2015-12-22 12:15:29 +08:00
parent d7ad93debc
commit bb50d2a2b2
10 changed files with 1082 additions and 13 deletions

View File

@ -0,0 +1,138 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.metamx.common.IAE;
import io.druid.segment.data.CompressedIntsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIntsIndexedSupplier;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.WritableSupplier;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* The format is mostly the same with CompressedVSizeIndexedSupplier(which has version 0x2, so we call it V2),
* the only difference is V3's offsets is not VSize encoded, it's just compressed.
* The reason we provide this is we can streams the data out in the binary format with CompressedVSizeIndexedV3Writer.
* If we want to streams VSizeInts, we must know the max value in the value sets. It's easy to know the max id of
* values(like dimension cardinality while encoding dimension), but difficult to known the max id of offsets.
*/
public class CompressedVSizeIndexedV3Supplier implements WritableSupplier<IndexedMultivalue<IndexedInts>>
{
public static final byte version = 0x3;
private final CompressedIntsIndexedSupplier offsetSupplier;
private final CompressedVSizeIntsIndexedSupplier valueSupplier;
CompressedVSizeIndexedV3Supplier(
CompressedIntsIndexedSupplier offsetSupplier,
CompressedVSizeIntsIndexedSupplier valueSupplier
)
{
this.offsetSupplier = offsetSupplier;
this.valueSupplier = valueSupplier;
}
public static CompressedVSizeIndexedV3Supplier fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
byte versionFromBuffer = buffer.get();
if (versionFromBuffer == version) {
CompressedIntsIndexedSupplier offsetSupplier = CompressedIntsIndexedSupplier.fromByteBuffer(
buffer,
order
);
CompressedVSizeIntsIndexedSupplier valueSupplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
buffer,
order
);
return new CompressedVSizeIndexedV3Supplier(offsetSupplier, valueSupplier);
}
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
// for test
public static CompressedVSizeIndexedV3Supplier fromIterable(
Iterable<IndexedInts> objectsIterable,
int offsetChunkFactor,
int maxValue,
final ByteOrder byteOrder,
CompressedObjectStrategy.CompressionStrategy compression
)
{
Iterator<IndexedInts> objects = objectsIterable.iterator();
List<Integer> offsetList = new ArrayList<>();
List<Integer> values = new ArrayList<>();
int offset = 0;
while (objects.hasNext()) {
IndexedInts next = objects.next();
offsetList.add(offset);
for (int i = 0; i < next.size(); i++) {
values.add(next.get(i));
}
offset += next.size();
}
offsetList.add(offset);
CompressedIntsIndexedSupplier headerSupplier = CompressedIntsIndexedSupplier.fromList(
offsetList,
offsetChunkFactor,
byteOrder,
compression
);
CompressedVSizeIntsIndexedSupplier valuesSupplier = CompressedVSizeIntsIndexedSupplier.fromList(
values,
maxValue,
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue),
byteOrder,
compression
);
return new CompressedVSizeIndexedV3Supplier(headerSupplier, valuesSupplier);
}
@Override
public long getSerializedSize()
{
return 1 + offsetSupplier.getSerializedSize() + valueSupplier.getSerializedSize();
}
@Override
public void writeToChannel(WritableByteChannel channel) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[]{version}));
offsetSupplier.writeToChannel(channel);
valueSupplier.writeToChannel(channel);
}
@Override
public IndexedMultivalue<IndexedInts> get()
{
return new CompressedVSizeIndexedSupplier.CompressedVSizeIndexed(offsetSupplier.get(), valueSupplier.get());
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.data;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
/**
* Streams array of integers out in the binary format described by CompressedIntsIndexedSupplier
*/
public class CompressedIntsIndexedWriter
{
public static final byte version = CompressedIntsIndexedSupplier.version;
private final int chunkFactor;
private final ByteOrder byteOrder;
private final CompressedObjectStrategy.CompressionStrategy compression;
private final GenericIndexedWriter<ResourceHolder<IntBuffer>> flattener;
private IntBuffer endBuffer;
private int numInserted;
public CompressedIntsIndexedWriter(
final IOPeon ioPeon,
final String filenameBase,
final int chunkFactor,
final ByteOrder byteOrder,
final CompressedObjectStrategy.CompressionStrategy compression
)
{
this.chunkFactor = chunkFactor;
this.byteOrder = byteOrder;
this.compression = compression;
this.flattener = new GenericIndexedWriter<>(
ioPeon, filenameBase, CompressedIntBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkFactor)
);
this.endBuffer = IntBuffer.allocate(chunkFactor);
this.numInserted = 0;
}
public void open() throws IOException
{
flattener.open();
}
public void add(int val) throws IOException
{
if (!endBuffer.hasRemaining()) {
endBuffer.rewind();
flattener.write(StupidResourceHolder.create(endBuffer));
endBuffer.rewind();
}
endBuffer.put(val);
numInserted++;
}
public long closeAndWriteToChannel(WritableByteChannel channel) throws IOException
{
if (numInserted > 0) {
endBuffer.limit(endBuffer.position());
endBuffer.rewind();
flattener.write(StupidResourceHolder.create(endBuffer));
}
endBuffer = null;
flattener.close();
channel.write(ByteBuffer.wrap(new byte[]{version}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor)));
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput());
long dataLen = ByteStreams.copy(from, channel);
return 1 + // version
Ints.BYTES + // numInserted
Ints.BYTES + // chunkFactor
1 + // compression id
dataLen; // data
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Streams array of integers out in the binary format described by CompressedVSizeIndexedV3Supplier
*/
package io.druid.segment.data;
import io.druid.segment.CompressedVSizeIndexedV3Supplier;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
public class CompressedVSizeIndexedV3Writer
{
public static final byte version = CompressedVSizeIndexedV3Supplier.version;
private final CompressedIntsIndexedWriter offsetWriter;
private final CompressedVSizeIntsIndexedWriter valueWriter;
private int offset;
public CompressedVSizeIndexedV3Writer(
CompressedIntsIndexedWriter offsetWriter,
CompressedVSizeIntsIndexedWriter valueWriter
)
{
this.offsetWriter = offsetWriter;
this.valueWriter = valueWriter;
this.offset = 0;
}
public void open() throws IOException
{
offsetWriter.open();
valueWriter.open();
}
public void add(int[] vals) throws IOException
{
offsetWriter.add(offset);
for (int val : vals) {
valueWriter.add(val);
}
offset += vals.length;
}
public long closeAndWriteToChannel(WritableByteChannel channel) throws IOException
{
channel.write(ByteBuffer.wrap(new byte[]{version}));
offsetWriter.add(offset);
long offsetLen = offsetWriter.closeAndWriteToChannel(channel);
long dataLen = valueWriter.closeAndWriteToChannel(channel);
return 1 + offsetLen + dataLen;
}
}

View File

@ -79,7 +79,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
return 1 << (Integer.SIZE - 1 - Integer.numberOfLeadingZeros(maxSizePer));
}
private static int bufferPadding(int numBytes)
public static int bufferPadding(int numBytes)
{
// when numBytes == 3 we need to pad the buffer to allow reading an extra byte
// beyond the end of the last value, since we use buffer.getInt() to read values.

View File

@ -0,0 +1,119 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.data;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
/**
* Streams array of integers out in the binary format described by CompressedVSizeIntsIndexedSupplier
*/
public class CompressedVSizeIntsIndexedWriter
{
public static final byte version = CompressedVSizeIntsIndexedSupplier.version;
private final int numBytes;
private final int chunkFactor;
private final int chunkBytes;
private final ByteOrder byteOrder;
private final CompressedObjectStrategy.CompressionStrategy compression;
private final GenericIndexedWriter<ResourceHolder<ByteBuffer>> flattener;
private final ByteBuffer intBuffer;
private ByteBuffer endBuffer;
private int numInserted;
public CompressedVSizeIntsIndexedWriter(
final IOPeon ioPeon,
final String filenameBase,
final int maxValue,
final int chunkFactor,
final ByteOrder byteOrder,
final CompressedObjectStrategy.CompressionStrategy compression
)
{
this.numBytes = VSizeIndexedInts.getNumBytesForMax(maxValue);
this.chunkFactor = chunkFactor;
this.chunkBytes = chunkFactor * numBytes + CompressedVSizeIntsIndexedSupplier.bufferPadding(numBytes);
this.byteOrder = byteOrder;
this.compression = compression;
this.flattener = new GenericIndexedWriter<>(
ioPeon, filenameBase, CompressedByteBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkBytes)
);
this.intBuffer = ByteBuffer.allocate(Ints.BYTES).order(byteOrder);
this.endBuffer = ByteBuffer.allocate(chunkBytes).order(byteOrder);
this.endBuffer.limit(numBytes * chunkFactor);
this.numInserted = 0;
}
public void open() throws IOException
{
flattener.open();
}
public void add(int val) throws IOException
{
if (!endBuffer.hasRemaining()) {
endBuffer.rewind();
flattener.write(StupidResourceHolder.create(endBuffer));
endBuffer.rewind();
endBuffer.limit(numBytes * chunkFactor);
}
intBuffer.putInt(0, val);
if (byteOrder.equals(ByteOrder.BIG_ENDIAN)) {
endBuffer.put(intBuffer.array(), Ints.BYTES - numBytes, numBytes);
} else {
endBuffer.put(intBuffer.array(), 0, numBytes);
}
numInserted++;
}
public long closeAndWriteToChannel(WritableByteChannel channel) throws IOException
{
if (numInserted > 0) {
endBuffer.limit(endBuffer.position());
endBuffer.rewind();
flattener.write(StupidResourceHolder.create(endBuffer));
}
endBuffer = null;
flattener.close();
channel.write(ByteBuffer.wrap(new byte[]{version, (byte) numBytes}));
channel.write(ByteBuffer.wrap(Ints.toByteArray(numInserted)));
channel.write(ByteBuffer.wrap(Ints.toByteArray(chunkFactor)));
channel.write(ByteBuffer.wrap(new byte[]{compression.getId()}));
final ReadableByteChannel from = Channels.newChannel(flattener.combineStreams().getInput());
long dataLen = ByteStreams.copy(from, channel);
return 1 + // version
1 + // numBytes
Ints.BYTES + // numInserted
Ints.BYTES + // chunkFactor
1 + // compression id
dataLen; // data
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressedVSizeIndexedSupplierTest;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.VSizeIndexedInts;
import io.druid.segment.data.WritableSupplier;
import org.junit.After;
import org.junit.Before;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
public class CompressedVSizeIndexedV3SupplierTest extends CompressedVSizeIndexedSupplierTest
{
@Before
public void setUpSimple(){
vals = Arrays.asList(
new int[1],
new int[]{1, 2, 3, 4, 5},
new int[]{6, 7, 8, 9, 10},
new int[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
);
indexedSupplier = CompressedVSizeIndexedV3Supplier.fromIterable(
Iterables.transform(
vals,
new Function<int[], IndexedInts>()
{
@Override
public IndexedInts apply(int[] input)
{
return VSizeIndexedInts.fromArray(input, 20);
}
}
), 2, 20, ByteOrder.nativeOrder(),
CompressedObjectStrategy.CompressionStrategy.LZ4
);
}
@After
public void teardown(){
indexedSupplier = null;
vals = null;
}
@Override
protected WritableSupplier<IndexedMultivalue<IndexedInts>> fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
return CompressedVSizeIndexedV3Supplier.fromByteBuffer(
buffer, ByteOrder.nativeOrder()
);
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.CloseQuietly;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.Random;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class CompressedIntsIndexedWriterTest
{
@Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}")
public static Iterable<Object[]> compressionStrategiesAndByteOrders()
{
Set<List<Object>> combinations = Sets.cartesianProduct(
Sets.newHashSet(CompressedObjectStrategy.CompressionStrategy.values()),
Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN)
);
return Iterables.transform(
combinations, new Function<List, Object[]>()
{
@Override
public Object[] apply(List input)
{
return new Object[]{input.get(0), input.get(1)};
}
}
);
}
private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF};
private static final int[] CHUNK_FACTORS = new int[]{1, 2, 100, CompressedIntsIndexedSupplier.MAX_INTS_IN_BUFFER};
private final IOPeon ioPeon = new TmpFileIOPeon();
private final CompressedObjectStrategy.CompressionStrategy compressionStrategy;
private final ByteOrder byteOrder;
private final Random rand = new Random(0);
private int[] vals;
public CompressedIntsIndexedWriterTest(
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
ByteOrder byteOrder
)
{
this.compressionStrategy = compressionStrategy;
this.byteOrder = byteOrder;
}
@Before
public void setUp() throws Exception
{
vals = null;
}
@After
public void tearDown() throws Exception
{
ioPeon.cleanup();
}
private void generateVals(final int totalSize, final int maxValue) throws IOException
{
vals = new int[totalSize];
for (int i = 0; i < vals.length; ++i) {
vals[i] = rand.nextInt(maxValue);
}
}
private void checkSerializedSizeAndData(int chunkFactor) throws Exception
{
CompressedIntsIndexedWriter writer = new CompressedIntsIndexedWriter(
ioPeon, "test", chunkFactor, byteOrder, compressionStrategy
);
CompressedIntsIndexedSupplier supplierFromList = CompressedIntsIndexedSupplier.fromList(
Ints.asList(vals), chunkFactor, byteOrder, compressionStrategy
);
writer.open();
for (int val : vals) {
writer.add(val);
}
final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output"));
long writtenLength = writer.closeAndWriteToChannel(outputChannel);
outputChannel.close();
assertEquals(writtenLength, supplierFromList.getSerializedSize());
// read from ByteBuffer and check values
CompressedIntsIndexedSupplier supplierFromByteBuffer = CompressedIntsIndexedSupplier.fromByteBuffer(
ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder
);
IndexedInts indexedInts = supplierFromByteBuffer.get();
assertEquals(vals.length, indexedInts.size());
for (int i = 0; i < vals.length; ++i) {
assertEquals(vals[i], indexedInts.get(i));
}
CloseQuietly.close(indexedInts);
}
@Test
public void testSmallData() throws Exception
{
// less than one chunk
for (int maxValue : MAX_VALUES) {
for (int chunkFactor : CHUNK_FACTORS) {
generateVals(rand.nextInt(chunkFactor), maxValue);
checkSerializedSizeAndData(chunkFactor);
}
}
}
@Test
public void testLargeData() throws Exception
{
// more than one chunk
for (int maxValue : MAX_VALUES) {
for (int chunkFactor : CHUNK_FACTORS) {
generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
checkSerializedSizeAndData(chunkFactor);
}
}
}
@Test
public void testWriteEmpty() throws Exception
{
vals = new int[0];
checkSerializedSizeAndData(2);
}
}

View File

@ -40,12 +40,13 @@ import java.util.List;
*/
public class CompressedVSizeIndexedSupplierTest
{
private List<int[]> vals;
protected List<int[]> vals;
private CompressedVSizeIndexedSupplier indexedSupplier;
protected WritableSupplier<IndexedMultivalue<IndexedInts>> indexedSupplier;
@Before
public void setUpSimple(){
public void setUpSimple()
{
vals = Arrays.asList(
new int[1],
new int[]{1, 2, 3, 4, 5},
@ -70,7 +71,8 @@ public class CompressedVSizeIndexedSupplierTest
}
@After
public void teardown(){
public void teardown()
{
indexedSupplier = null;
vals = null;
}
@ -89,7 +91,7 @@ public class CompressedVSizeIndexedSupplierTest
final byte[] bytes = baos.toByteArray();
Assert.assertEquals(indexedSupplier.getSerializedSize(), bytes.length);
CompressedVSizeIndexedSupplier deserializedIndexed = CompressedVSizeIndexedSupplier.fromByteBuffer(
WritableSupplier<IndexedMultivalue<IndexedInts>> deserializedIndexed = fromByteBuffer(
ByteBuffer.wrap(bytes),
ByteOrder.nativeOrder()
);
@ -98,26 +100,28 @@ public class CompressedVSizeIndexedSupplierTest
}
@Test(expected = IllegalArgumentException.class)
public void testGetInvalidElementInRow(){
public void testGetInvalidElementInRow()
{
indexedSupplier.get().get(3).get(15);
}
@Test
public void testIterators(){
public void testIterators()
{
Iterator<IndexedInts> iterator = indexedSupplier.get().iterator();
int row = 0;
while(iterator.hasNext()){
while (iterator.hasNext()) {
final int[] ints = vals.get(row);
final IndexedInts vSizeIndexedInts = iterator.next();
Assert.assertEquals(ints.length, vSizeIndexedInts.size());
Iterator<Integer> valsIterator = vSizeIndexedInts.iterator();
int j=0;
while(valsIterator.hasNext()){
Assert.assertEquals((Integer)ints[j], valsIterator.next());
int j = 0;
while (valsIterator.hasNext()) {
Assert.assertEquals((Integer) ints[j], valsIterator.next());
j++;
}
row ++;
row++;
}
}
@ -134,4 +138,11 @@ public class CompressedVSizeIndexedSupplierTest
}
}
}
protected WritableSupplier<IndexedMultivalue<IndexedInts>> fromByteBuffer(ByteBuffer buffer, ByteOrder order)
{
return CompressedVSizeIndexedSupplier.fromByteBuffer(
buffer, ByteOrder.nativeOrder()
);
}
}

View File

@ -0,0 +1,219 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.CloseQuietly;
import io.druid.segment.CompressedVSizeIndexedV3Supplier;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class CompressedVSizeIndexedV3WriterTest
{
@Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}")
public static Iterable<Object[]> compressionStrategiesAndByteOrders()
{
Set<List<Object>> combinations = Sets.cartesianProduct(
Sets.newHashSet(CompressedObjectStrategy.CompressionStrategy.values()),
Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN)
);
return Iterables.transform(
combinations, new Function<List, Object[]>()
{
@Override
public Object[] apply(List input)
{
return new Object[]{input.get(0), input.get(1)};
}
}
);
}
private static final int[] OFFSET_CHUNK_FACTORS = new int[]{
1,
2,
100,
CompressedIntsIndexedSupplier.MAX_INTS_IN_BUFFER
};
private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF};
private final IOPeon ioPeon = new TmpFileIOPeon();
private final CompressedObjectStrategy.CompressionStrategy compressionStrategy;
private final ByteOrder byteOrder;
private final Random rand = new Random(0);
private List<int[]> vals;
public CompressedVSizeIndexedV3WriterTest(
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
ByteOrder byteOrder
)
{
this.compressionStrategy = compressionStrategy;
this.byteOrder = byteOrder;
}
private void generateVals(final int totalSize, final int maxValue) throws IOException
{
vals = new ArrayList<>(totalSize);
for (int i = 0; i < totalSize; ++i) {
int len = rand.nextInt(2) + 1;
int[] subVals = new int[len];
for (int j = 0; j < len; ++j) {
subVals[j] = rand.nextInt(maxValue);
}
vals.add(subVals);
}
}
private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception
{
int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0;
CompressedIntsIndexedWriter offsetWriter = new CompressedIntsIndexedWriter(
ioPeon, "offset", offsetChunkFactor, byteOrder, compressionStrategy
);
CompressedVSizeIntsIndexedWriter valueWriter = new CompressedVSizeIntsIndexedWriter(
ioPeon, "value", maxValue, valueChunkFactor, byteOrder, compressionStrategy
);
CompressedVSizeIndexedV3Writer writer = new CompressedVSizeIndexedV3Writer(offsetWriter, valueWriter);
CompressedVSizeIndexedV3Supplier supplierFromIterable = CompressedVSizeIndexedV3Supplier.fromIterable(
Iterables.transform(
vals, new Function<int[], IndexedInts>()
{
@Nullable
@Override
public IndexedInts apply(@Nullable final int[] input)
{
return new ArrayBasedIndexedInts(input);
}
}
), offsetChunkFactor, maxValue, byteOrder, compressionStrategy
);
writer.open();
for (int[] val : vals) {
writer.add(val);
}
final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output"));
long writtenLength = writer.closeAndWriteToChannel(outputChannel);
outputChannel.close();
assertEquals(writtenLength, supplierFromIterable.getSerializedSize());
// read from ByteBuffer and check values
CompressedVSizeIndexedV3Supplier supplierFromByteBuffer = CompressedVSizeIndexedV3Supplier.fromByteBuffer(
ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder
);
IndexedMultivalue<IndexedInts> indexedMultivalue = supplierFromByteBuffer.get();
assertEquals(indexedMultivalue.size(), vals.size());
for (int i = 0; i < vals.size(); ++i) {
IndexedInts subVals = indexedMultivalue.get(i);
assertEquals(subVals.size(), vals.get(i).length);
for (int j = 0; j < subVals.size(); ++j) {
assertEquals(subVals.get(j), vals.get(i)[j]);
}
}
CloseQuietly.close(indexedMultivalue);
}
int getMaxValue(final List<int[]> vals)
{
return Ordering.natural().max(
Iterables.transform(
vals, new Function<int[], Integer>()
{
@Nullable
@Override
public Integer apply(int[] input)
{
return input.length > 0 ? Ints.max(input) : 0;
}
}
)
);
}
@Before
public void setUp() throws Exception
{
vals = null;
}
@After
public void tearDown() throws Exception
{
ioPeon.cleanup();
}
@Test
public void testSmallData() throws Exception
{
// less than one chunk
for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
for (int maxValue : MAX_VALUES) {
final int valueChunk = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue);
generateVals(rand.nextInt(valueChunk), maxValue);
checkSerializedSizeAndData(offsetChunk, valueChunk);
}
}
}
@Test
public void testLargeData() throws Exception
{
// more than one chunk
for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
for (int maxValue : MAX_VALUES) {
final int valueChunk = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue);
generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue);
checkSerializedSizeAndData(offsetChunk, valueChunk);
}
}
}
@Test
public void testEmpty() throws Exception
{
vals = new ArrayList<>();
checkSerializedSizeAndData(1, 2);
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.data;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.CloseQuietly;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.Random;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@RunWith(Parameterized.class)
public class CompressedVSizeIntsIndexedWriterTest
{
@Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}")
public static Iterable<Object[]> compressionStrategiesAndByteOrders()
{
Set<List<Object>> combinations = Sets.cartesianProduct(
Sets.newHashSet(CompressedObjectStrategy.CompressionStrategy.values()),
Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN)
);
return Iterables.transform(
combinations, new Function<List, Object[]>()
{
@Override
public Object[] apply(List input)
{
return new Object[]{input.get(0), input.get(1)};
}
}
);
}
private static final int[] MAX_VALUES = new int[]{0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF};
private final IOPeon ioPeon = new TmpFileIOPeon();
private final CompressedObjectStrategy.CompressionStrategy compressionStrategy;
private final ByteOrder byteOrder;
private final Random rand = new Random(0);
private int[] vals;
public CompressedVSizeIntsIndexedWriterTest(
CompressedObjectStrategy.CompressionStrategy compressionStrategy,
ByteOrder byteOrder
)
{
this.compressionStrategy = compressionStrategy;
this.byteOrder = byteOrder;
}
@Before
public void setUp() throws Exception
{
vals = null;
}
@After
public void tearDown() throws Exception
{
ioPeon.cleanup();
}
private void generateVals(final int totalSize, final int maxValue) throws IOException
{
vals = new int[totalSize];
for (int i = 0; i < vals.length; ++i) {
vals[i] = rand.nextInt(maxValue);
}
}
private void checkSerializedSizeAndData(int chunkSize) throws Exception
{
CompressedVSizeIntsIndexedWriter writer = new CompressedVSizeIntsIndexedWriter(
ioPeon, "test", vals.length > 0 ? Ints.max(vals) : 0, chunkSize, byteOrder, compressionStrategy
);
CompressedVSizeIntsIndexedSupplier supplierFromList = CompressedVSizeIntsIndexedSupplier.fromList(
Ints.asList(vals), vals.length > 0 ? Ints.max(vals) : 0, chunkSize, byteOrder, compressionStrategy
);
writer.open();
for (int val : vals) {
writer.add(val);
}
final WritableByteChannel outputChannel = Channels.newChannel(ioPeon.makeOutputStream("output"));
long writtenLength = writer.closeAndWriteToChannel(outputChannel);
outputChannel.close();
assertEquals(writtenLength, supplierFromList.getSerializedSize());
// read from ByteBuffer and check values
CompressedVSizeIntsIndexedSupplier supplierFromByteBuffer = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(
ByteBuffer.wrap(IOUtils.toByteArray(ioPeon.makeInputStream("output"))), byteOrder
);
IndexedInts indexedInts = supplierFromByteBuffer.get();
for (int i = 0; i < vals.length; ++i) {
assertEquals(vals[i], indexedInts.get(i));
}
CloseQuietly.close(indexedInts);
}
@Test
public void testSmallData() throws Exception
{
// less than one chunk
for (int maxValue : MAX_VALUES) {
final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue);
generateVals(rand.nextInt(maxChunkSize), maxValue);
checkSerializedSizeAndData(maxChunkSize);
}
}
@Test
public void testLargeData() throws Exception
{
// more than one chunk
for (int maxValue : MAX_VALUES) {
final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue);
generateVals((rand.nextInt(5) + 5) * maxChunkSize + rand.nextInt(maxChunkSize), maxValue);
checkSerializedSizeAndData(maxChunkSize);
}
}
@Test
public void testEmpty() throws Exception
{
vals = new int[0];
checkSerializedSizeAndData(2);
}
}