From d20128b89bd04a0c32a20185a410aacbbb023571 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 13 Apr 2015 21:57:19 -0700 Subject: [PATCH] add compressed variable-size ints column type --- .../CompressedByteBufferObjectStrategy.java | 71 ++++ .../CompressedVSizeIntsIndexedSupplier.java | 395 ++++++++++++++++++ ...ompressedVSizeIntsIndexedSupplierTest.java | 371 ++++++++++++++++ 3 files changed, 837 insertions(+) create mode 100644 processing/src/main/java/io/druid/segment/data/CompressedByteBufferObjectStrategy.java create mode 100644 processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java create mode 100644 processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplierTest.java diff --git a/processing/src/main/java/io/druid/segment/data/CompressedByteBufferObjectStrategy.java b/processing/src/main/java/io/druid/segment/data/CompressedByteBufferObjectStrategy.java new file mode 100644 index 00000000000..e7fc0433d8a --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/CompressedByteBufferObjectStrategy.java @@ -0,0 +1,71 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.collect.Ordering; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class CompressedByteBufferObjectStrategy extends FixedSizeCompressedObjectStrategy +{ + public static final Ordering ORDERING = Ordering.natural().nullsFirst(); + + public static CompressedByteBufferObjectStrategy getBufferForOrder(final ByteOrder order, final CompressionStrategy compression, final int sizePer) + { + return new CompressedByteBufferObjectStrategy(order, compression, sizePer); + } + + public CompressedByteBufferObjectStrategy( + ByteOrder order, + CompressionStrategy compression, + final int sizePer + ) + { + super( + order, new BufferConverter() + { + @Override + public ByteBuffer convert(ByteBuffer buf) + { + return buf; + } + + @Override + public int compare(ByteBuffer lhs, ByteBuffer rhs) + { + return ORDERING.compare(lhs, rhs); + } + + @Override + public int sizeOf(int count) + { + return count; // 1 byte per element + } + + @Override + public ByteBuffer combine(ByteBuffer into, ByteBuffer from) + { + return into.put(from); + } + }, compression, sizePer + ); + } +} diff --git a/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java new file mode 100644 index 00000000000..0509e3a55ae --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplier.java @@ -0,0 +1,395 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.base.Preconditions; +import com.google.common.io.Closeables; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Shorts; +import com.metamx.common.IAE; +import com.metamx.common.guava.CloseQuietly; +import io.druid.collections.ResourceHolder; +import io.druid.collections.StupidResourceHolder; +import io.druid.segment.CompressedPools; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; +import java.nio.ShortBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Iterator; +import java.util.List; + +public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier +{ + public static final byte version = 0x2; + + private final int totalSize; + private final int sizePer; + private final int numBytes; + private final int bigEndianShift; + private final int littleEndianMask; + private final GenericIndexed> baseBuffers; + private final CompressedObjectStrategy.CompressionStrategy compression; + + CompressedVSizeIntsIndexedSupplier( + int totalSize, + int sizePer, + int numBytes, + GenericIndexed> baseBuffers, + CompressedObjectStrategy.CompressionStrategy compression + ) + { + Preconditions.checkArgument( + sizePer == (1 << Integer.numberOfTrailingZeros(sizePer)), + "Number of entries per chunk must be a power of 2" + ); + + this.totalSize = totalSize; + this.sizePer = sizePer; + this.baseBuffers = baseBuffers; + this.compression = compression; + this.numBytes = numBytes; + this.bigEndianShift = Integer.SIZE - (numBytes << 3); // numBytes * 8 + this.littleEndianMask = (int)((1L << (numBytes << 3)) - 1); // set numBytes * 8 lower bits to 1 + } + + public static int maxIntsInBufferForBytes(int numBytes) + { + int maxSizePer = (CompressedPools.BUFFER_SIZE - bufferPadding(numBytes)) / numBytes; + // round down to the nearest power of 2 + return 1 << (Integer.SIZE - 1 - Integer.numberOfLeadingZeros(maxSizePer)); + } + + private static int bufferPadding(int numBytes) + { + // when numBytes == 3 we need to pad the buffer to allow reading an extra byte + // beyond the end of the last value, since we use buffer.getInt() to read values. + // for numBytes 1, 2 we remove the need for padding by reading bytes or shorts directly. + switch (numBytes) { + case Shorts.BYTES: + case 1: + return 0; + default: + return Ints.BYTES - numBytes; + } + } + + public static int maxIntsInBufferForValue(int maxValue) + { + return maxIntsInBufferForBytes(VSizeIndexedInts.getNumBytesForMax(maxValue)); + } + + public int size() + { + return totalSize; + } + + @Override + public IndexedInts get() + { + // optimized versions for int, short, and byte columns + if(numBytes == Ints.BYTES) { + return new CompressedFullSizeIndexedInts(); + } else if (numBytes == Shorts.BYTES) { + return new CompressedShortSizeIndexedInts(); + } else if (numBytes == 1) { + return new CompressedByteSizeIndexedInts(); + } else { + // default version of everything else, i.e. 3-bytes per value + return new CompressedVSizeIndexedInts(); + } + } + + + public long getSerializedSize() + { + return 1 + // version + 1 + // numBytes + Ints.BYTES + // totalSize + Ints.BYTES + // sizePer + 1 + // compression id + baseBuffers.getSerializedSize(); // data + } + + public void writeToChannel(WritableByteChannel channel) throws IOException + { + channel.write(ByteBuffer.wrap(new byte[]{version, (byte) numBytes})); + channel.write(ByteBuffer.wrap(Ints.toByteArray(totalSize))); + channel.write(ByteBuffer.wrap(Ints.toByteArray(sizePer))); + channel.write(ByteBuffer.wrap(new byte[]{compression.getId()})); + baseBuffers.writeToChannel(channel); + } + + /** + * For testing. Do not use unless you like things breaking + */ + GenericIndexed> getBaseBuffers() + { + return baseBuffers; + } + + public static CompressedVSizeIntsIndexedSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order) + { + byte versionFromBuffer = buffer.get(); + + if (versionFromBuffer == version) { + final int numBytes = buffer.get(); + final int totalSize = buffer.getInt(); + final int sizePer = buffer.getInt(); + final int chunkBytes = sizePer * numBytes + bufferPadding(numBytes); + + final CompressedObjectStrategy.CompressionStrategy compression = CompressedObjectStrategy.CompressionStrategy.forId(buffer.get()); + + return new CompressedVSizeIntsIndexedSupplier( + totalSize, + sizePer, + numBytes, + GenericIndexed.read( + buffer, + CompressedByteBufferObjectStrategy.getBufferForOrder(order, compression, chunkBytes) + ), + compression + ); + + } + + throw new IAE("Unknown version[%s]", versionFromBuffer); + } + + public static CompressedVSizeIntsIndexedSupplier fromList( + final List list, final int maxValue, final int chunkFactor, final ByteOrder byteOrder, CompressedObjectStrategy.CompressionStrategy compression + ) + { + final int numBytes = VSizeIndexedInts.getNumBytesForMax(maxValue); + final int chunkBytes = chunkFactor * numBytes + bufferPadding(numBytes); + + Preconditions.checkArgument( + chunkFactor <= maxIntsInBufferForBytes(numBytes), + "Chunks must be <= 64k bytes. chunkFactor was[%s]", + chunkFactor + ); + + return new CompressedVSizeIntsIndexedSupplier( + list.size(), + chunkFactor, + numBytes, + GenericIndexed.fromIterable( + new Iterable>() + { + @Override + public Iterator> iterator() + { + return new Iterator>() + { + int position = 0; + + @Override + public boolean hasNext() + { + return position < list.size(); + } + + @Override + public ResourceHolder next() + { + ByteBuffer retVal = ByteBuffer + .allocate(chunkBytes) + .order(byteOrder); + + if (chunkFactor > list.size() - position) { + retVal.limit((list.size() - position) * numBytes); + } else { + retVal.limit(chunkFactor * numBytes); + } + + final List ints = list.subList(position, position + retVal.remaining() / numBytes); + final ByteBuffer buf = ByteBuffer + .allocate(Ints.BYTES) + .order(byteOrder); + final boolean bigEndian = byteOrder.equals(ByteOrder.BIG_ENDIAN); + for (int value : ints) { + buf.putInt(0, value); + if (bigEndian) { + retVal.put(buf.array(), Ints.BYTES - numBytes, numBytes); + } else { + retVal.put(buf.array(), 0, numBytes); + } + } + retVal.rewind(); + position += retVal.remaining() / numBytes; + + return StupidResourceHolder.create(retVal); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + }, + CompressedByteBufferObjectStrategy.getBufferForOrder(byteOrder, compression, chunkBytes) + ), + compression + ); + } + + private class CompressedFullSizeIndexedInts extends CompressedVSizeIndexedInts { + IntBuffer intBuffer; + + @Override + protected void loadBuffer(int bufferNum) + { + super.loadBuffer(bufferNum); + intBuffer = buffer.asIntBuffer(); + } + + @Override + protected int _get(int index) + { + return intBuffer.get(intBuffer.position() + index); + } + } + + private class CompressedShortSizeIndexedInts extends CompressedVSizeIndexedInts { + ShortBuffer shortBuffer; + + @Override + protected void loadBuffer(int bufferNum) + { + super.loadBuffer(bufferNum); + shortBuffer = buffer.asShortBuffer(); + } + + @Override + protected int _get(int index) + { + // removes the need for padding + return shortBuffer.get(shortBuffer.position() + index) & 0xFFFF; + } + } + + private class CompressedByteSizeIndexedInts extends CompressedVSizeIndexedInts { + @Override + protected int _get(int index) + { + // removes the need for padding + return buffer.get(buffer.position() + index) & 0xFF; + } + } + + private class CompressedVSizeIndexedInts implements IndexedInts + { + final Indexed> singleThreadedBuffers = baseBuffers.singleThreaded(); + + final int div = Integer.numberOfTrailingZeros(sizePer); + final int rem = sizePer - 1; + + int currIndex = -1; + ResourceHolder holder; + ByteBuffer buffer; + boolean bigEndian; + + @Override + public int size() + { + return totalSize; + } + + /** + * Returns the value at the given index into the column. + * + * Assumes the number of entries in each decompression buffers is a power of two. + * + * @param index index of the value in the column + * @return the value at the given index + */ + @Override + public int get(int index) + { + // assumes the number of entries in each buffer is a power of 2 + final int bufferNum = index >> div; + + if (bufferNum != currIndex) { + loadBuffer(bufferNum); + } + + return _get(index & rem); + } + + /** + * Returns the value at the given index in the current decompression buffer + * + * @param index index of the value in the curent buffer + * @return the value at the given index + */ + protected int _get(final int index) + { + final int pos = buffer.position() + index * numBytes; + // example for numBytes = 3 + // big-endian: 0x000c0b0a stored 0c 0b 0a XX, read 0x0c0b0aXX >>> 8 + // little-endian: 0x000c0b0a stored 0a 0b 0c XX, read 0xXX0c0b0a & 0x00FFFFFF + return bigEndian ? + buffer.getInt(pos) >>> bigEndianShift : + buffer.getInt(pos) & littleEndianMask; + } + + @Override + public Iterator iterator() + { + return new IndexedIntsIterator(this); + } + + @Override + public void fill(int index, int[] toFill) + { + throw new UnsupportedOperationException("fill not supported"); + } + + protected void loadBuffer(int bufferNum) + { + CloseQuietly.close(holder); + holder = singleThreadedBuffers.get(bufferNum); + buffer = holder.get(); + currIndex = bufferNum; + bigEndian = buffer.order().equals(ByteOrder.BIG_ENDIAN); + } + + @Override + public String toString() + { + return "CompressedVSizedIntsIndexedSupplier{" + + "currIndex=" + currIndex + + ", sizePer=" + sizePer + + ", numChunks=" + singleThreadedBuffers.size() + + ", totalSize=" + totalSize + + '}'; + } + + @Override + public void close() throws IOException + { + Closeables.close(holder, false); + } + } +} diff --git a/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplierTest.java b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplierTest.java new file mode 100644 index 00000000000..4c897e44663 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/data/CompressedVSizeIntsIndexedSupplierTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.metamx.common.guava.CloseQuietly; +import io.druid.segment.CompressedPools; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.Channels; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +@RunWith(Parameterized.class) +public class CompressedVSizeIntsIndexedSupplierTest extends CompressionStrategyTest +{ + @Parameterized.Parameters(name = "{index}: compression={0}, byteOrder={1}") + public static Iterable compressionStrategies() + { + final Iterable compressionStrategies = Iterables.transform( + CompressionStrategyTest.compressionStrategies(), + new Function() + { + @Override + public CompressedObjectStrategy.CompressionStrategy apply(Object[] input) + { + return (CompressedObjectStrategy.CompressionStrategy) input[0]; + } + } + ); + + Set> combinations = Sets.cartesianProduct( + Sets.newHashSet(compressionStrategies), + Sets.newHashSet(ByteOrder.BIG_ENDIAN, ByteOrder.LITTLE_ENDIAN) + ); + + return Iterables.transform( + combinations, new Function() + { + @Override + public Object[] apply(List input) + { + return new Object[]{input.get(0), input.get(1)}; + } + } + ); + } + + private static final int[] MAX_VALUES = new int[] { 0xFF, 0xFFFF, 0xFFFFFF, 0x0FFFFFFF }; + + public CompressedVSizeIntsIndexedSupplierTest(CompressedObjectStrategy.CompressionStrategy compressionStrategy, ByteOrder byteOrder) + { + super(compressionStrategy); + this.byteOrder = byteOrder; + } + + private IndexedInts indexed; + private CompressedVSizeIntsIndexedSupplier supplier; + private int[] vals; + private final ByteOrder byteOrder; + + + @Before + public void setUp() throws Exception + { + CloseQuietly.close(indexed); + indexed = null; + supplier = null; + vals = null; + } + + @After + public void tearDown() throws Exception + { + CloseQuietly.close(indexed); + } + + private void setupSimple(final int chunkSize) + { + CloseQuietly.close(indexed); + + vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; + + supplier = CompressedVSizeIntsIndexedSupplier.fromList( + Ints.asList(vals), + Ints.max(vals), + chunkSize, + ByteOrder.nativeOrder(), + compressionStrategy + ); + + indexed = supplier.get(); + } + + private void setupSimpleWithSerde(final int chunkSize) throws IOException + { + vals = new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}; + + makeWithSerde(chunkSize); + } + + private void makeWithSerde(final int chunkSize) throws IOException + { + CloseQuietly.close(indexed); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final CompressedVSizeIntsIndexedSupplier theSupplier = CompressedVSizeIntsIndexedSupplier.fromList( + Ints.asList(vals), Ints.max(vals), chunkSize, byteOrder, compressionStrategy + ); + theSupplier.writeToChannel(Channels.newChannel(baos)); + + final byte[] bytes = baos.toByteArray(); + Assert.assertEquals(theSupplier.getSerializedSize(), bytes.length); + + supplier = CompressedVSizeIntsIndexedSupplier.fromByteBuffer(ByteBuffer.wrap(bytes), byteOrder); + indexed = supplier.get(); + } + + private void setupLargeChunks(final int chunkSize, final int totalSize, final int maxValue) throws IOException + { + vals = new int[totalSize]; + Random rand = new Random(0); + for(int i = 0; i < vals.length; ++i) { + // VSizeIndexed only allows positive values + vals[i] = rand.nextInt(maxValue); + } + + makeWithSerde(chunkSize); + } + + @Test + public void testSanity() throws Exception + { + setupSimple(2); + Assert.assertEquals(8, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + + setupSimple(4); + Assert.assertEquals(4, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + + setupSimple(32); + Assert.assertEquals(1, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + } + + @Test + public void testLargeChunks() throws Exception + { + for (int maxValue : MAX_VALUES) { + final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue); + + setupLargeChunks(maxChunkSize, 10 * maxChunkSize, maxValue); + Assert.assertEquals(10, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(maxChunkSize, 10 * maxChunkSize + 1, maxValue); + Assert.assertEquals(11, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(1, 0xFFFF, maxValue); + Assert.assertEquals(0xFFFF, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + + setupLargeChunks(maxChunkSize / 2, 10 * (maxChunkSize / 2) + 1, maxValue); + Assert.assertEquals(11, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + } + } + + @Test + public void testChunkTooBig() throws Exception + { + for(int maxValue : MAX_VALUES) { + final int maxChunkSize = CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(maxValue); + try { + setupLargeChunks(maxChunkSize + 1, 10 * (maxChunkSize + 1), maxValue); + Assert.fail(); + } catch(IllegalArgumentException e) { + Assert.assertTrue("chunk too big for maxValue " + maxValue, true); + } + } + } + + @Test + public void testmaxIntsInBuffer() throws Exception + { + Assert.assertEquals(CompressedPools.BUFFER_SIZE, CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(1)); + Assert.assertEquals(CompressedPools.BUFFER_SIZE / 2, CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(2)); + Assert.assertEquals(CompressedPools.BUFFER_SIZE / 4, CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(4)); + + Assert.assertEquals(CompressedPools.BUFFER_SIZE, 0x10000); // nearest power of 2 is 2^14 + Assert.assertEquals(1 << 14, CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForBytes(3)); + } + + @Test + public void testSanityWithSerde() throws Exception + { + setupSimpleWithSerde(4); + + Assert.assertEquals(4, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + + setupSimpleWithSerde(2); + + Assert.assertEquals(8, supplier.getBaseBuffers().size()); + assertIndexMatchesVals(); + } + + + // This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it, + // which sucks but I can't think of a way to deterministically cause it... + @Test + public void testConcurrentThreadReads() throws Exception + { + setupSimple(4); + + final AtomicReference reason = new AtomicReference<>("none"); + + final int numRuns = 1000; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch stopLatch = new CountDownLatch(2); + final AtomicBoolean failureHappened = new AtomicBoolean(false); + new Thread(new Runnable() + { + @Override + public void run() + { + try { + startLatch.await(); + } + catch (InterruptedException e) { + failureHappened.set(true); + reason.set("interrupt."); + stopLatch.countDown(); + return; + } + + try { + for (int i = 0; i < numRuns; ++i) { + for (int j = 0; j < indexed.size(); ++j) { + final long val = vals[j]; + final long indexedVal = indexed.get(j); + if (Longs.compare(val, indexedVal) != 0) { + failureHappened.set(true); + reason.set(String.format("Thread1[%d]: %d != %d", j, val, indexedVal)); + stopLatch.countDown(); + return; + } + } + } + } + catch (Exception e) { + e.printStackTrace(); + failureHappened.set(true); + reason.set(e.getMessage()); + } + + stopLatch.countDown(); + } + }).start(); + + final IndexedInts indexed2 = supplier.get(); + try { + new Thread(new Runnable() + { + @Override + public void run() + { + try { + startLatch.await(); + } + catch (InterruptedException e) { + stopLatch.countDown(); + return; + } + + try { + for (int i = 0; i < numRuns; ++i) { + for (int j = indexed2.size() - 1; j >= 0; --j) { + final long val = vals[j]; + final long indexedVal = indexed2.get(j); + if (Longs.compare(val, indexedVal) != 0) { + failureHappened.set(true); + reason.set(String.format("Thread2[%d]: %d != %d", j, val, indexedVal)); + stopLatch.countDown(); + return; + } + } + } + } + catch (Exception e) { + e.printStackTrace(); + reason.set(e.getMessage()); + failureHappened.set(true); + } + + stopLatch.countDown(); + } + }).start(); + + startLatch.countDown(); + + stopLatch.await(); + } + finally { + CloseQuietly.close(indexed2); + } + + if (failureHappened.get()) { + Assert.fail("Failure happened. Reason: " + reason.get()); + } + } + + private void assertIndexMatchesVals() + { + Assert.assertEquals(vals.length, indexed.size()); + + // sequential access + int[] indices = new int[vals.length]; + for (int i = 0; i < indexed.size(); ++i) { + final int expected = vals[i]; + final int actual = indexed.get(i); + Assert.assertEquals(expected, actual); + indices[i] = i; + } + + Collections.shuffle(Arrays.asList(indices)); + // random access + for (int i = 0; i < indexed.size(); ++i) { + int k = indices[i]; + Assert.assertEquals(vals[k], indexed.get(k)); + } + } +}