Improve unit test coverage for MSQ (#13398)

* add faults tests for the multi stage query

* add too many parttiions fault

* add toomanyinputfilesfault

* programmatically generate the file

* refactor

* Trigger Build
This commit is contained in:
Laksh Singla 2022-11-29 17:27:04 +05:30 committed by GitHub
parent 4ed6255bdf
commit 79df11c16c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 301 additions and 74 deletions

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.TooManyPartitionsFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class MSQFaultsTest extends MSQTestBase
{
@Test
public void testInsertWithTooManySegments() throws IOException
{
Map<String, Object> context = ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put("rowsPerSegment", 1)
.build();
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.build();
File file = MSQTestFileUtils.generateTemporaryNdJsonFile(30000, 1);
String filePathAsJson = queryFramework().queryJsonMapper().writeValueAsString(file.getAbsolutePath());
testIngestQuery().setSql(" insert into foo1 SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [" + filePathAsJson + "],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\",\"type\":\"string\"}]'\n"
+ " )\n"
+ ") PARTITIONED by day")
.setExpectedDataSource("foo1")
.setExpectedRowSignature(rowSignature)
.setQueryContext(context)
.setExpectedMSQFault(new TooManyPartitionsFault(25000))
.verifyResults();
}
@Test
public void testInsertWithUnsupportedColumnType()
{
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();
testIngestQuery()
.setSql(StringUtils.format(
" insert into foo1 SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " col1\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [\"ignored\"],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"},{\"name\": \"col1\", \"type\": \"long_array\"} ]'\n"
+ " )\n"
+ ") PARTITIONED by day"
))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
.setExpectedMSQFault(UnknownFault.forMessage(
"org.apache.druid.java.util.common.ISE: Cannot create dimension for type [ARRAY<LONG>]"))
.verifyResults();
}
@Test
public void testInsertWithManyColumns()
{
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();
final int numColumns = 2000;
String columnNames = IntStream.range(1, numColumns)
.mapToObj(i -> "col" + i).collect(Collectors.joining(", "));
String externSignature = IntStream.range(1, numColumns)
.mapToObj(i -> StringUtils.format(
"{\"name\": \"col%d\", \"type\": \"string\"}",
i
))
.collect(Collectors.joining(", "));
testIngestQuery()
.setSql(StringUtils.format(
" insert into foo1 SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " %s\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [\"ignored\"],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, %s]'\n"
+ " )\n"
+ ") PARTITIONED by day",
columnNames,
externSignature
))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
.setExpectedMSQFault(new TooManyColumnsFault(numColumns + 2, 2000))
.verifyResults();
}
@Test
public void testInsertWithHugeClusteringKeys()
{
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();
final int numColumns = 1700;
String columnNames = IntStream.range(1, numColumns)
.mapToObj(i -> "col" + i).collect(Collectors.joining(", "));
String clusteredByClause = IntStream.range(1, numColumns + 1)
.mapToObj(String::valueOf)
.collect(Collectors.joining(", "));
String externSignature = IntStream.range(1, numColumns)
.mapToObj(i -> StringUtils.format(
"{\"name\": \"col%d\", \"type\": \"string\"}",
i
))
.collect(Collectors.joining(", "));
testIngestQuery()
.setSql(StringUtils.format(
" insert into foo1 SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " %s\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [\"ignored\"],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, %s]'\n"
+ " )\n"
+ ") PARTITIONED by day CLUSTERED BY %s",
columnNames,
externSignature,
clusteredByClause
))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
.setExpectedMSQFault(new TooManyClusteredByColumnsFault(numColumns + 2, 1500, 0))
.verifyResults();
}
@Test
public void testTooManyInputFiles() throws IOException
{
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();
final int numFiles = 20000;
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
String externalFiles = String.join(", ", Collections.nCopies(numFiles, toReadFileNameAsJson));
testIngestQuery()
.setSql(StringUtils.format(
"insert into foo1 SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [%s],\"type\":\"local\"}',\n"
+ " '{\"type\": \"csv\", \"hasHeaderRow\": true}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}]'\n"
+ " )\n"
+ ") PARTITIONED by day",
externalFiles
))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
.setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 2))
.verifyResults();
}
}

View File

@ -27,13 +27,12 @@ import com.google.common.hash.Hashing;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
@ -55,8 +54,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class MSQInsertTest extends MSQTestBase
{
@ -83,7 +80,7 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testInsertOnExternalDataSource() throws IOException
{
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
@ -308,7 +305,7 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testRollUpOnExternalDataSource() throws IOException
{
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
@ -344,7 +341,7 @@ public class MSQInsertTest extends MSQTestBase
@Test()
public void testRollUpOnExternalDataSourceWithCompositeKey() throws IOException
{
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()
@ -448,48 +445,6 @@ public class MSQInsertTest extends MSQTestBase
.verifyPlanningErrors();
}
@Test
public void testInsertWithHugeClusteringKeys()
{
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();
final int numColumns = 1700;
String columnNames = IntStream.range(1, numColumns)
.mapToObj(i -> "col" + i).collect(Collectors.joining(", "));
String clusteredByClause = IntStream.range(1, numColumns + 1)
.mapToObj(String::valueOf)
.collect(Collectors.joining(", "));
String externSignature = IntStream.range(1, numColumns)
.mapToObj(i -> StringUtils.format(
"{\"name\": \"col%d\", \"type\": \"string\"}",
i
))
.collect(Collectors.joining(", "));
testIngestQuery()
.setSql(StringUtils.format(
" insert into foo1 SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " %s\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [\"ignored\"],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, %s]'\n"
+ " )\n"
+ ") PARTITIONED by day CLUSTERED BY %s",
columnNames,
externSignature,
clusteredByClause
))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
.setExpectedMSQFault(new TooManyClusteredByColumnsFault(numColumns + 2, 1500, 0))
.verifyResults();
}
@Test
public void testInsertRestrictedColumns()
@ -542,7 +497,7 @@ public class MSQInsertTest extends MSQTestBase
@Test
public void testInsertWithTooLargeRowShouldThrowException() throws IOException
{
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
Mockito.doReturn(500).when(workerMemoryParameters).getLargeFrameSize();

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.SqlPlanningException;
@ -109,7 +110,7 @@ public class MSQReplaceTest extends MSQTestBase
.add("__time", ColumnType.LONG)
.add("cnt", ColumnType.LONG).build();
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
testIngestQuery().setSql(" REPLACE INTO foo1 OVERWRITE ALL SELECT "
@ -147,7 +148,7 @@ public class MSQReplaceTest extends MSQTestBase
.add("__time", ColumnType.LONG)
.add("user", ColumnType.STRING).build();
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
testIngestQuery().setSql(" REPLACE INTO foo1 OVERWRITE WHERE __time >= TIMESTAMP '2016-06-27 01:00:00.00' AND __time < TIMESTAMP '2016-06-27 02:00:00.00' "

View File

@ -34,6 +34,7 @@ import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
@ -733,7 +734,7 @@ public class MSQSelectTest extends MSQTestBase
@Test
public void testExternSelect1() throws IOException
{
final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/wikipedia-sampled.json");
final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
RowSignature rowSignature = RowSignature.builder()

View File

@ -29,6 +29,7 @@ import org.apache.druid.msq.indexing.ColumnMappings;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestFileUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Query;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -65,7 +66,7 @@ public class MSQWarningsTest extends MSQTestBase
@Before
public void setUp3() throws IOException
{
toRead = getResourceAsTemporaryFile("/unparseable.gz");
toRead = MSQTestFileUtils.getResourceAsTemporaryFile(this, "/unparseable.gz");
toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
rowSignature = RowSignature.builder()

View File

@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
@ -56,7 +55,6 @@ import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
@ -167,8 +165,6 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -446,22 +442,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
}
}
/**
* Helper method that copies a resource to a temporary file, then returns it.
*/
protected File getResourceAsTemporaryFile(final String resource) throws IOException
{
final File file = temporaryFolder.newFile();
final InputStream stream = getClass().getResourceAsStream(resource);
if (stream == null) {
throw new IOE("No such resource [%s]", resource);
}
ByteStreams.copy(stream, Files.newOutputStream(file.toPath()));
return file;
}
@Nonnull
private Supplier<Pair<Segment, Closeable>> getSupplierForSegment(SegmentId segmentId)
{

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.test;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
public class MSQTestFileUtils
{
/**
* Helper method that copies a resource to a temporary file, then returns it.
*/
public static File getResourceAsTemporaryFile(Object object, final String resource) throws IOException
{
final File file = BaseCalciteQueryTest.temporaryFolder.newFile();
final InputStream stream = object.getClass().getResourceAsStream(resource);
if (stream == null) {
throw new IOE("No such resource [%s]", resource);
}
ByteStreams.copy(stream, Files.newOutputStream(file.toPath()));
return file;
}
/**
* Helper method that populates a temporary file with {@code numRows} rows and {@code numColumns} columns where the
* first column is a string 'timestamp' while the rest are string columns with junk value
*/
public static File generateTemporaryNdJsonFile(final int numRows, final int numColumns) throws IOException
{
final File file = BaseCalciteQueryTest.temporaryFolder.newFile();
for (int currentRow = 0; currentRow < numRows; ++currentRow) {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("\"timestamp\":\"2016-06-27T00:00:11.080Z\"");
for (int currentColumn = 1; currentColumn < numColumns; ++currentColumn) {
sb.append(StringUtils.format(",\"column%s\":\"val%s\"", currentColumn, currentRow));
}
sb.append("}");
Files.write(file.toPath(), ImmutableList.of(sb.toString()), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
}
file.deleteOnExit();
return file;
}
}