mirror of https://github.com/apache/druid.git
Speed up FrameFileTest, SuperSorterTest. (#17068)
* Speed up FrameFileTest, SuperSorterTest. These are two heavily parameterized tests that, together, account for about 60% of runtime in the test suite. FrameFileTest changes: 1) Cache frame files in a static, rather than building the frame file for each parameterization of the test. 2) Adjust TestArrayCursorFactory to cache the signature, rather than re-creating it on each call to getColumnCapabilities. SuperSorterTest changes: 1) Dramatically reduce the number of tests that run with "maxRowsPerFrame" = 1. These are particularly slow due to writing so many small files. Some still run, since it's useful to test edge cases, but much fewer than before. 2) Reduce the "maxActiveProcessors" axis of the test from [1, 2, 4] to [1, 3]. The aim is to reduce the number of cases while still getting good coverage of the feature. 3) Reduce the "maxChannelsPerProcessor" axis of the test from [2, 3, 8] to [2, 7]. The aim is to reduce the number of cases while still getting good coverage of the feature. 4) Use in-memory input channels rather than file channels. 5) Defer formatting of assertion failure messages until they are needed. 6) Cache the cursor factory and its signature in a static. 7) Cache sorted test rows (used for verification) in a static. * It helps to include the file. * Style.
This commit is contained in:
parent
73a644258d
commit
5b7fb5fbca
|
@ -88,11 +88,12 @@ public class RowKeyComparisonRunLengths
|
|||
);
|
||||
}
|
||||
|
||||
ColumnType columnType = rowSignature.getColumnType(keyColumn.columnName())
|
||||
.orElseThrow(() -> DruidException.defensive("Need column types"));
|
||||
ColumnType columnType =
|
||||
rowSignature.getColumnType(keyColumn.columnName())
|
||||
.orElseThrow(() -> DruidException.defensive("No type for column[%s]", keyColumn.columnName()));
|
||||
|
||||
// First key column to be processed
|
||||
if (runLengthEntryBuilders.size() == 0) {
|
||||
if (runLengthEntryBuilders.isEmpty()) {
|
||||
final boolean isByteComparable = isByteComparable(columnType);
|
||||
runLengthEntryBuilders.add(
|
||||
new RunLengthEntryBuilder(isByteComparable, keyColumn.order())
|
||||
|
|
|
@ -48,9 +48,12 @@ import java.util.List;
|
|||
*/
|
||||
public class TestArrayCursorFactory extends QueryableIndexCursorFactory
|
||||
{
|
||||
private final RowSignature signature;
|
||||
|
||||
public TestArrayCursorFactory(QueryableIndex index)
|
||||
{
|
||||
super(index);
|
||||
this.signature = computeRowSignature(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -81,26 +84,10 @@ public class TestArrayCursorFactory extends QueryableIndexCursorFactory
|
|||
};
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public RowSignature getRowSignature()
|
||||
{
|
||||
final RowSignature.Builder builder = RowSignature.builder();
|
||||
builder.addTimeColumn();
|
||||
|
||||
for (final String column : super.getRowSignature().getColumnNames()) {
|
||||
ColumnCapabilities columnCapabilities = super.getColumnCapabilities(column);
|
||||
ColumnType columnType = columnCapabilities == null ? null : columnCapabilities.toColumnType();
|
||||
//change MV strings columns to Array<String>
|
||||
if (columnType != null
|
||||
&& columnType.equals(ColumnType.STRING)
|
||||
&& columnCapabilities.hasMultipleValues().isMaybeTrue()) {
|
||||
columnType = ColumnType.STRING_ARRAY;
|
||||
}
|
||||
builder.add(column, columnType);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
return signature;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
@ -115,6 +102,26 @@ public class TestArrayCursorFactory extends QueryableIndexCursorFactory
|
|||
}
|
||||
}
|
||||
|
||||
private static RowSignature computeRowSignature(final QueryableIndex index)
|
||||
{
|
||||
final RowSignature.Builder builder = RowSignature.builder();
|
||||
builder.addTimeColumn();
|
||||
|
||||
for (final String column : new QueryableIndexCursorFactory(index).getRowSignature().getColumnNames()) {
|
||||
ColumnCapabilities columnCapabilities = index.getColumnCapabilities(column);
|
||||
ColumnType columnType = columnCapabilities == null ? null : columnCapabilities.toColumnType();
|
||||
//change MV strings columns to Array<String>
|
||||
if (columnType != null
|
||||
&& columnType.equals(ColumnType.STRING)
|
||||
&& columnCapabilities.hasMultipleValues().isMaybeTrue()) {
|
||||
columnType = ColumnType.STRING_ARRAY;
|
||||
}
|
||||
builder.add(column, columnType);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private class DecoratedCursor implements Cursor
|
||||
{
|
||||
private final Cursor cursor;
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.druid.segment.column.RowSignature;
|
|||
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
|
||||
import org.apache.druid.testing.InitializedNullHandlingTest;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
|
@ -49,17 +50,28 @@ import org.junit.rules.TemporaryFolder;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.math.RoundingMode;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class FrameFileTest extends InitializedNullHandlingTest
|
||||
{
|
||||
/**
|
||||
* Static cache of generated frame files, to speed up tests. Cleared in {@link #afterClass()}.
|
||||
*/
|
||||
private static final Map<FrameFileKey, byte[]> FRAME_FILES = new HashMap<>();
|
||||
|
||||
// Partition every 99 rows if "partitioned" is true.
|
||||
private static final int PARTITION_SIZE = 99;
|
||||
|
||||
|
@ -122,6 +134,7 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
|||
};
|
||||
|
||||
abstract CursorFactory getCursorFactory();
|
||||
|
||||
abstract int getRowCount();
|
||||
}
|
||||
|
||||
|
@ -195,38 +208,21 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
|||
{
|
||||
cursorFactory = adapterType.getCursorFactory();
|
||||
rowCount = adapterType.getRowCount();
|
||||
file = temporaryFolder.newFile();
|
||||
|
||||
if (partitioned) {
|
||||
// Partition every PARTITION_SIZE rows.
|
||||
file = FrameTestUtil.writeFrameFileWithPartitions(
|
||||
FrameSequenceBuilder.fromCursorFactory(cursorFactory).frameType(frameType).maxRowsPerFrame(maxRowsPerFrame).frames().map(
|
||||
new Function<Frame, IntObjectPair<Frame>>()
|
||||
{
|
||||
private int rows = 0;
|
||||
|
||||
@Override
|
||||
public IntObjectPair<Frame> apply(final Frame frame)
|
||||
{
|
||||
final int partitionNum = rows / PARTITION_SIZE;
|
||||
rows += frame.numRows();
|
||||
return IntObjectPair.of(
|
||||
partitionNum >= SKIP_PARTITION ? partitionNum + 1 : partitionNum,
|
||||
frame
|
||||
);
|
||||
}
|
||||
}
|
||||
),
|
||||
temporaryFolder.newFile()
|
||||
);
|
||||
|
||||
} else {
|
||||
file = FrameTestUtil.writeFrameFile(
|
||||
FrameSequenceBuilder.fromCursorFactory(cursorFactory).frameType(frameType).maxRowsPerFrame(maxRowsPerFrame).frames(),
|
||||
temporaryFolder.newFile()
|
||||
);
|
||||
try (final OutputStream out = Files.newOutputStream(file.toPath())) {
|
||||
final FrameFileKey frameFileKey = new FrameFileKey(adapterType, frameType, maxRowsPerFrame, partitioned);
|
||||
final byte[] frameFileBytes = FRAME_FILES.computeIfAbsent(frameFileKey, FrameFileTest::computeFrameFile);
|
||||
out.write(frameFileBytes);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass()
|
||||
{
|
||||
FRAME_FILES.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_numFrames() throws IOException
|
||||
{
|
||||
|
@ -414,4 +410,107 @@ public class FrameFileTest extends InitializedNullHandlingTest
|
|||
return FrameTestUtil.readRowsFromCursorFactory(cursorFactory, RowSignature.empty(), false)
|
||||
.accumulate(0, (i, in) -> i + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns bytes, in frame file format, corresponding to the given {@link FrameFileKey}.
|
||||
*/
|
||||
private static byte[] computeFrameFile(final FrameFileKey frameFileKey)
|
||||
{
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
|
||||
try {
|
||||
if (frameFileKey.partitioned) {
|
||||
// Partition every PARTITION_SIZE rows.
|
||||
FrameTestUtil.writeFrameFileWithPartitions(
|
||||
FrameSequenceBuilder.fromCursorFactory(frameFileKey.adapterType.getCursorFactory())
|
||||
.frameType(frameFileKey.frameType)
|
||||
.maxRowsPerFrame(frameFileKey.maxRowsPerFrame)
|
||||
.frames()
|
||||
.map(
|
||||
new Function<Frame, IntObjectPair<Frame>>()
|
||||
{
|
||||
private int rows = 0;
|
||||
|
||||
@Override
|
||||
public IntObjectPair<Frame> apply(final Frame frame)
|
||||
{
|
||||
final int partitionNum = rows / PARTITION_SIZE;
|
||||
rows += frame.numRows();
|
||||
return IntObjectPair.of(
|
||||
partitionNum >= SKIP_PARTITION ? partitionNum + 1 : partitionNum,
|
||||
frame
|
||||
);
|
||||
}
|
||||
}
|
||||
),
|
||||
baos
|
||||
);
|
||||
} else {
|
||||
FrameTestUtil.writeFrameFile(
|
||||
FrameSequenceBuilder.fromCursorFactory(frameFileKey.adapterType.getCursorFactory())
|
||||
.frameType(frameFileKey.frameType)
|
||||
.maxRowsPerFrame(frameFileKey.maxRowsPerFrame)
|
||||
.frames(),
|
||||
baos
|
||||
);
|
||||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
/**
|
||||
* Key for {@link #FRAME_FILES}, and input to {@link #computeFrameFile(FrameFileKey)}.
|
||||
*/
|
||||
private static class FrameFileKey
|
||||
{
|
||||
final AdapterType adapterType;
|
||||
final FrameType frameType;
|
||||
final int maxRowsPerFrame;
|
||||
final boolean partitioned;
|
||||
|
||||
public FrameFileKey(AdapterType adapterType, FrameType frameType, int maxRowsPerFrame, boolean partitioned)
|
||||
{
|
||||
this.adapterType = adapterType;
|
||||
this.frameType = frameType;
|
||||
this.maxRowsPerFrame = maxRowsPerFrame;
|
||||
this.partitioned = partitioned;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
FrameFileKey that = (FrameFileKey) o;
|
||||
return maxRowsPerFrame == that.maxRowsPerFrame
|
||||
&& partitioned == that.partitioned
|
||||
&& adapterType == that.adapterType
|
||||
&& frameType == that.frameType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(adapterType, frameType, maxRowsPerFrame, partitioned);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "FrameFileKey{" +
|
||||
"adapterType=" + adapterType +
|
||||
", frameType=" + frameType +
|
||||
", maxRowsPerFrame=" + maxRowsPerFrame +
|
||||
", partitioned=" + partitioned +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,13 +29,7 @@ import org.apache.druid.frame.Frame;
|
|||
import org.apache.druid.frame.FrameType;
|
||||
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
|
||||
import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
|
||||
import org.apache.druid.frame.channel.ByteTracker;
|
||||
import org.apache.druid.frame.channel.ReadableFileFrameChannel;
|
||||
import org.apache.druid.frame.channel.ReadableFrameChannel;
|
||||
import org.apache.druid.frame.channel.WritableFrameChannel;
|
||||
import org.apache.druid.frame.channel.WritableFrameFileChannel;
|
||||
import org.apache.druid.frame.file.FrameFile;
|
||||
import org.apache.druid.frame.file.FrameFileWriter;
|
||||
import org.apache.druid.frame.key.ClusterBy;
|
||||
import org.apache.druid.frame.key.ClusterByPartition;
|
||||
import org.apache.druid.frame.key.ClusterByPartitions;
|
||||
|
@ -47,6 +41,7 @@ import org.apache.druid.frame.key.RowKeyReader;
|
|||
import org.apache.druid.frame.read.FrameReader;
|
||||
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
|
||||
import org.apache.druid.frame.testutil.FrameTestUtil;
|
||||
import org.apache.druid.frame.write.FrameWriters;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
|
@ -62,8 +57,10 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
|
|||
import org.hamcrest.MatcherAssert;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
@ -73,12 +70,12 @@ import org.junit.runners.Parameterized;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
@ -228,6 +225,15 @@ public class SuperSorterTest
|
|||
@RunWith(Parameterized.class)
|
||||
public static class ParameterizedCasesTest extends InitializedNullHandlingTest
|
||||
{
|
||||
private static CursorFactory CURSOR_FACTORY;
|
||||
private static RowSignature CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER;
|
||||
|
||||
/**
|
||||
* Static cache of sorted versions of the {@link #CURSOR_FACTORY} dataset, to speed up tests.
|
||||
* Cleared in {@link #tearDownClass()}.
|
||||
*/
|
||||
private static final Map<ClusterBy, List<List<Object>>> SORTED_TEST_ROWS = new HashMap<>();
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
|
@ -241,7 +247,6 @@ public class SuperSorterTest
|
|||
private final boolean partitionsDeferred;
|
||||
private final long limitHint;
|
||||
|
||||
private CursorFactory cursorFactory;
|
||||
private RowSignature signature;
|
||||
private FrameProcessorExecutor exec;
|
||||
private List<ReadableFrameChannel> inputChannels;
|
||||
|
@ -285,11 +290,12 @@ public class SuperSorterTest
|
|||
{
|
||||
final List<Object[]> constructors = new ArrayList<>();
|
||||
|
||||
for (int maxRowsPerFrame : new int[]{Integer.MAX_VALUE, 50, 1}) {
|
||||
// Add some constructors for testing maxRowsPerFrame > 1. Later on, we'll add some for maxRowsPerFrame = 1.
|
||||
for (int maxRowsPerFrame : new int[]{Integer.MAX_VALUE, 50}) {
|
||||
for (int maxBytesPerFrame : new int[]{20_000, 2_000_000}) {
|
||||
for (int numChannels : new int[]{1, 3}) {
|
||||
for (int maxActiveProcessors : new int[]{1, 2, 4}) {
|
||||
for (int maxChannelsPerProcessor : new int[]{2, 3, 8}) {
|
||||
for (int maxActiveProcessors : new int[]{1, 3}) {
|
||||
for (int maxChannelsPerProcessor : new int[]{2, 7}) {
|
||||
for (int numThreads : new int[]{1, 3}) {
|
||||
for (boolean isComposedStorage : new boolean[]{true, false}) {
|
||||
for (boolean partitionsDeferred : new boolean[]{true, false}) {
|
||||
|
@ -317,16 +323,51 @@ public class SuperSorterTest
|
|||
}
|
||||
}
|
||||
|
||||
// Add some constructors for testing maxRowsPerFrame = 1. This isn't part of the full matrix since it's quite
|
||||
// slow, but we still want to exercise it a bit.
|
||||
for (boolean isComposedStorage : new boolean[]{true, false}) {
|
||||
for (long limitHint : new long[]{SuperSorter.UNLIMITED, 3, 1_000}) {
|
||||
constructors.add(
|
||||
new Object[]{
|
||||
1 /* maxRowsPerFrame */,
|
||||
20_000 /* maxBytesPerFrame */,
|
||||
3 /* numChannels */,
|
||||
2 /* maxActiveProcessors */,
|
||||
3 /* maxChannelsPerProcessor */,
|
||||
1 /* numThreads */,
|
||||
isComposedStorage,
|
||||
false /* partitionsDeferred */,
|
||||
limitHint
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return constructors;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpClass()
|
||||
{
|
||||
CURSOR_FACTORY = new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex());
|
||||
CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER =
|
||||
FrameSequenceBuilder.signatureWithRowNumber(CURSOR_FACTORY.getRowSignature());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownClass()
|
||||
{
|
||||
CURSOR_FACTORY = null;
|
||||
CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER = null;
|
||||
SORTED_TEST_ROWS.clear();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp()
|
||||
{
|
||||
exec = new FrameProcessorExecutor(
|
||||
MoreExecutors.listeningDecorator(Execs.multiThreaded(numThreads, getClass().getSimpleName() + "[%d]"))
|
||||
);
|
||||
cursorFactory = new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex());
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -352,15 +393,15 @@ public class SuperSorterTest
|
|||
}
|
||||
|
||||
final FrameSequenceBuilder frameSequenceBuilder =
|
||||
FrameSequenceBuilder.fromCursorFactory(cursorFactory)
|
||||
FrameSequenceBuilder.fromCursorFactory(CURSOR_FACTORY)
|
||||
.maxRowsPerFrame(maxRowsPerFrame)
|
||||
.sortBy(clusterBy.getColumns())
|
||||
.allocator(ArenaMemoryAllocator.create(ByteBuffer.allocate(maxBytesPerFrame)))
|
||||
.frameType(FrameType.ROW_BASED)
|
||||
.populateRowNumber();
|
||||
|
||||
inputChannels = makeFileChannels(frameSequenceBuilder.frames(), temporaryFolder.newFolder(), numChannels);
|
||||
signature = frameSequenceBuilder.signature();
|
||||
inputChannels = makeRoundRobinChannels(frameSequenceBuilder.frames(), numChannels);
|
||||
signature = FrameWriters.sortableSignature(CURSOR_FACTORY_SIGNATURE_WITH_ROW_NUMBER, clusterBy.getColumns());
|
||||
frameReader = FrameReader.create(signature);
|
||||
}
|
||||
|
||||
|
@ -411,7 +452,7 @@ public class SuperSorterTest
|
|||
Assert.assertEquals(clusterByPartitions.size(), outputChannels.getAllChannels().size());
|
||||
Assert.assertEquals(Double.valueOf(1.0), superSorterProgressTracker.snapshot().getProgressDigest());
|
||||
|
||||
final int[] clusterByPartColumns = clusterBy.getColumns().stream().mapToInt(
|
||||
final int[] clusterByColumns = clusterBy.getColumns().stream().mapToInt(
|
||||
part -> signature.indexOf(part.columnName())
|
||||
).toArray();
|
||||
|
||||
|
@ -427,33 +468,36 @@ public class SuperSorterTest
|
|||
frameReader
|
||||
).forEach(
|
||||
row -> {
|
||||
final Object[] array = new Object[clusterByPartColumns.length];
|
||||
final Object[] array = new Object[clusterByColumns.length];
|
||||
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
array[i] = row.get(clusterByPartColumns[i]);
|
||||
array[i] = row.get(clusterByColumns[i]);
|
||||
}
|
||||
|
||||
final RowKey key = createKey(clusterBy, array);
|
||||
|
||||
Assert.assertTrue(
|
||||
StringUtils.format(
|
||||
"Key %s >= partition %,d start %s",
|
||||
keyReader.read(key),
|
||||
partitionNumber,
|
||||
partition.getStart() == null ? null : keyReader.read(partition.getStart())
|
||||
),
|
||||
partition.getStart() == null || keyComparator.compare(key, partition.getStart()) >= 0
|
||||
);
|
||||
if (!(partition.getStart() == null || keyComparator.compare(key, partition.getStart()) >= 0)) {
|
||||
// Defer formatting of error message until it's actually needed
|
||||
Assert.fail(
|
||||
StringUtils.format(
|
||||
"Key %s >= partition %,d start %s",
|
||||
keyReader.read(key),
|
||||
partitionNumber,
|
||||
partition.getStart() == null ? null : keyReader.read(partition.getStart())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
Assert.assertTrue(
|
||||
StringUtils.format(
|
||||
"Key %s < partition %,d end %s",
|
||||
keyReader.read(key),
|
||||
partitionNumber,
|
||||
partition.getEnd() == null ? null : keyReader.read(partition.getEnd())
|
||||
),
|
||||
partition.getEnd() == null || keyComparator.compare(key, partition.getEnd()) < 0
|
||||
);
|
||||
if (!(partition.getEnd() == null || keyComparator.compare(key, partition.getEnd()) < 0)) {
|
||||
Assert.fail(
|
||||
StringUtils.format(
|
||||
"Key %s < partition %,d end %s",
|
||||
keyReader.read(key),
|
||||
partitionNumber,
|
||||
partition.getEnd() == null ? null : keyReader.read(partition.getEnd())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
readRows.add(row);
|
||||
}
|
||||
|
@ -464,21 +508,9 @@ public class SuperSorterTest
|
|||
MatcherAssert.assertThat(readRows.size(), Matchers.greaterThanOrEqualTo(Ints.checkedCast(limitHint)));
|
||||
}
|
||||
|
||||
final Sequence<List<Object>> expectedRows = Sequences.sort(
|
||||
FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, true),
|
||||
Comparator.comparing(
|
||||
row -> {
|
||||
final Object[] array = new Object[clusterByPartColumns.length];
|
||||
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
array[i] = row.get(clusterByPartColumns[i]);
|
||||
}
|
||||
|
||||
return createKey(clusterBy, array);
|
||||
},
|
||||
keyComparator
|
||||
)
|
||||
).limit(limitHint == SuperSorter.UNLIMITED ? Long.MAX_VALUE : readRows.size());
|
||||
final Sequence<List<Object>> expectedRows =
|
||||
Sequences.simple(getOrComputeSortedTestRows(clusterBy))
|
||||
.limit(limitHint == SuperSorter.UNLIMITED ? Long.MAX_VALUE : readRows.size());
|
||||
|
||||
FrameTestUtil.assertRowsEqual(expectedRows, Sequences.simple(readRows));
|
||||
}
|
||||
|
@ -724,29 +756,63 @@ public class SuperSorterTest
|
|||
final RowSignature keySignature = KeyTestUtils.createKeySignature(clusterBy.getColumns(), signature);
|
||||
return KeyTestUtils.createKey(keySignature, objects);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve sorted test rows from {@link #SORTED_TEST_ROWS}, or else compute using
|
||||
* {@link #computeSortedTestRows(ClusterBy)}.
|
||||
*/
|
||||
private static List<List<Object>> getOrComputeSortedTestRows(final ClusterBy clusterBy)
|
||||
{
|
||||
return SORTED_TEST_ROWS.computeIfAbsent(clusterBy, SuperSorterTest.ParameterizedCasesTest::computeSortedTestRows);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort test rows from {@link TestIndex#getNoRollupMMappedTestIndex()} by the given {@link ClusterBy}.
|
||||
*/
|
||||
private static List<List<Object>> computeSortedTestRows(final ClusterBy clusterBy)
|
||||
{
|
||||
final QueryableIndexCursorFactory cursorFactory =
|
||||
new QueryableIndexCursorFactory(TestIndex.getNoRollupMMappedTestIndex());
|
||||
final RowSignature signature =
|
||||
FrameWriters.sortableSignature(
|
||||
FrameSequenceBuilder.signatureWithRowNumber(cursorFactory.getRowSignature()),
|
||||
clusterBy.getColumns()
|
||||
);
|
||||
final RowSignature keySignature = KeyTestUtils.createKeySignature(clusterBy.getColumns(), signature);
|
||||
final int[] clusterByColumns =
|
||||
clusterBy.getColumns().stream().mapToInt(part -> signature.indexOf(part.columnName())).toArray();
|
||||
final Comparator<RowKey> keyComparator = clusterBy.keyComparator(keySignature);
|
||||
|
||||
return Sequences.sort(
|
||||
FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, true),
|
||||
Comparator.comparing(
|
||||
row -> {
|
||||
final Object[] array = new Object[clusterByColumns.length];
|
||||
|
||||
for (int i = 0; i < array.length; i++) {
|
||||
array[i] = row.get(clusterByColumns[i]);
|
||||
}
|
||||
|
||||
return KeyTestUtils.createKey(keySignature, array);
|
||||
},
|
||||
keyComparator
|
||||
)
|
||||
).toList();
|
||||
}
|
||||
}
|
||||
|
||||
private static List<ReadableFrameChannel> makeFileChannels(
|
||||
/**
|
||||
* Distribute frames round-robin to some number of channels.
|
||||
*/
|
||||
private static List<ReadableFrameChannel> makeRoundRobinChannels(
|
||||
final Sequence<Frame> frames,
|
||||
final File tmpDir,
|
||||
final int numChannels
|
||||
) throws IOException
|
||||
{
|
||||
final List<File> files = new ArrayList<>();
|
||||
final List<WritableFrameChannel> writableChannels = new ArrayList<>();
|
||||
final List<BlockingQueueFrameChannel> channels = new ArrayList<>(numChannels);
|
||||
|
||||
for (int i = 0; i < numChannels; i++) {
|
||||
final File file = new File(tmpDir, StringUtils.format("channel-%d", i));
|
||||
files.add(file);
|
||||
writableChannels.add(
|
||||
new WritableFrameFileChannel(
|
||||
FrameFileWriter.open(
|
||||
Channels.newChannel(Files.newOutputStream(file.toPath())),
|
||||
null,
|
||||
ByteTracker.unboundedTracker()
|
||||
)
|
||||
)
|
||||
);
|
||||
channels.add(new BlockingQueueFrameChannel(2000) /* enough even for 1 row per frame; dataset has < 2000 rows */);
|
||||
}
|
||||
|
||||
frames.forEach(
|
||||
|
@ -758,7 +824,7 @@ public class SuperSorterTest
|
|||
public void accept(final Frame frame)
|
||||
{
|
||||
try {
|
||||
writableChannels.get(i % writableChannels.size()).write(frame);
|
||||
channels.get(i % channels.size()).writable().write(frame);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
@ -771,20 +837,11 @@ public class SuperSorterTest
|
|||
|
||||
final List<ReadableFrameChannel> retVal = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < writableChannels.size(); i++) {
|
||||
WritableFrameChannel writableChannel = writableChannels.get(i);
|
||||
writableChannel.close();
|
||||
retVal.add(new ReadableFileFrameChannel(FrameFile.open(files.get(i), null)));
|
||||
for (final BlockingQueueFrameChannel channel : channels) {
|
||||
channel.writable().close();
|
||||
retVal.add(channel.readable());
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private static <T> long countSequence(final Sequence<T> sequence)
|
||||
{
|
||||
return sequence.accumulate(
|
||||
0L,
|
||||
(accumulated, in) -> accumulated + 1
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,6 +67,17 @@ public class FrameSequenceBuilder
|
|||
return new FrameSequenceBuilder(cursorFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns what {@link #signature()} would return if {@link #populateRowNumber()} is set.
|
||||
*/
|
||||
public static RowSignature signatureWithRowNumber(final RowSignature signature)
|
||||
{
|
||||
return RowSignature.builder()
|
||||
.addAll(signature)
|
||||
.add(FrameTestUtil.ROW_NUMBER_COLUMN, ColumnType.LONG)
|
||||
.build();
|
||||
}
|
||||
|
||||
public FrameSequenceBuilder frameType(final FrameType frameType)
|
||||
{
|
||||
this.frameType = frameType;
|
||||
|
@ -108,10 +119,7 @@ public class FrameSequenceBuilder
|
|||
final RowSignature baseSignature;
|
||||
|
||||
if (populateRowNumber) {
|
||||
baseSignature = RowSignature.builder()
|
||||
.addAll(cursorFactory.getRowSignature())
|
||||
.add(FrameTestUtil.ROW_NUMBER_COLUMN, ColumnType.LONG)
|
||||
.build();
|
||||
baseSignature = signatureWithRowNumber(cursorFactory.getRowSignature());
|
||||
} else {
|
||||
baseSignature = cursorFactory.getRowSignature();
|
||||
}
|
||||
|
|
|
@ -56,9 +56,10 @@ import org.junit.Assert;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -79,12 +80,14 @@ public class FrameTestUtil
|
|||
|
||||
public static File writeFrameFile(final Sequence<Frame> frames, final File file) throws IOException
|
||||
{
|
||||
try (
|
||||
final FileOutputStream fos = new FileOutputStream(file);
|
||||
final FrameFileWriter writer = FrameFileWriter.open(
|
||||
Channels.newChannel(fos), null, ByteTracker.unboundedTracker()
|
||||
)
|
||||
) {
|
||||
writeFrameFile(frames, Files.newOutputStream(file.toPath()));
|
||||
return file;
|
||||
}
|
||||
|
||||
public static void writeFrameFile(final Sequence<Frame> frames, final OutputStream out) throws IOException
|
||||
{
|
||||
try (final FrameFileWriter writer =
|
||||
FrameFileWriter.open(Channels.newChannel(out), null, ByteTracker.unboundedTracker())) {
|
||||
frames.forEach(
|
||||
frame -> {
|
||||
try {
|
||||
|
@ -96,17 +99,15 @@ public class FrameTestUtil
|
|||
}
|
||||
);
|
||||
}
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
public static File writeFrameFileWithPartitions(
|
||||
public static void writeFrameFileWithPartitions(
|
||||
final Sequence<IntObjectPair<Frame>> framesWithPartitions,
|
||||
final File file
|
||||
final OutputStream out
|
||||
) throws IOException
|
||||
{
|
||||
try (final FrameFileWriter writer = FrameFileWriter.open(
|
||||
Channels.newChannel(new FileOutputStream(file)),
|
||||
Channels.newChannel(out),
|
||||
null,
|
||||
ByteTracker.unboundedTracker()
|
||||
)) {
|
||||
|
@ -121,8 +122,6 @@ public class FrameTestUtil
|
|||
}
|
||||
);
|
||||
}
|
||||
|
||||
return file;
|
||||
}
|
||||
|
||||
public static void assertRowsEqual(final Sequence<List<Object>> expected, final Sequence<List<Object>> actual)
|
||||
|
|
Loading…
Reference in New Issue