More superbatch range partitioning tests (#9266)

More functional tests to cover handling of input data that has a
partition dimension that contains:

1) Null values: Should be in first partition

2) Multi values: Should cause superbatch task to abort
This commit is contained in:
Chi Cao Minh 2020-02-10 15:17:54 -08:00 committed by GitHub
parent ea006dc72a
commit e8146d5914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 109 additions and 60 deletions

View File

@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class NullValueHandlingConfig
{
private static final String NULL_HANDLING_CONFIG_STRING = "druid.generic.useDefaultValueForNull";
public static final String NULL_HANDLING_CONFIG_STRING = "druid.generic.useDefaultValueForNull";
@JsonProperty("useDefaultValueForNull")
private final boolean useDefaultValuesForNull;

View File

@ -277,6 +277,11 @@
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -108,7 +108,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec,
int maxNumConcurrentSubTasks
int maxNumConcurrentSubTasks,
TaskState expectedTaskStatus
) throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
@ -129,7 +130,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
TaskStatus taskStatus = task.run(toolbox);
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode());
shutdownTask(task);
return actionClient.getPublishedSegments();
}

View File

@ -170,6 +170,10 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
catch (EntryExistsException e) {
throw new RuntimeException(e);
}
// WARNING: In production, subtasks are created via HTTP calls and instantiated by Jackson, which means they
// cannot share objects like they can here. For example, if the indexing task uses JsonParseSpec, the same
// JSONFlattenerMaker instance is shared among subtasks, which is bad since JSONFlattenerMaker is not thread-safe.
tasks.put(subTask.getId(), service.submit(() -> {
try {
final TaskToolbox toolbox = createTaskToolbox(subTask);

View File

@ -26,6 +26,7 @@ import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
@ -131,7 +132,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
MAX_NUM_CONCURRENT_SUB_TASKS
MAX_NUM_CONCURRENT_SUB_TASKS,
TaskState.SUCCESS
);
assertHashedPartition(publishedSegments);
}

View File

@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.common.config.NullValueHandlingConfig;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
@ -38,7 +40,6 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
@ -48,7 +49,9 @@ import org.hamcrest.Matchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ProvideSystemProperty;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -67,63 +70,76 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest
{
private static final boolean USE_INPUT_FORMAT_API = true;
private static final boolean USE_MULTIVALUE_DIM = true;
private static final int NUM_FILE = 10;
private static final int NUM_ROW = 20;
private static final int NUM_DAY = 2;
private static final int DIM_FILE_CARDINALITY = 2;
private static final int NUM_PARTITION = 2;
private static final int YEAR = 2017;
private static final String TIME = "ts";
private static final String DIM1 = "dim1";
private static final String DIM2 = "dim2";
private static final String LIST_DELIMITER = "|";
private static final List<String> DIMS = ImmutableList.of(DIM1, DIM2);
private static final String TEST_FILE_NAME_PREFIX = "test_";
private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
new TimestampSpec(
"ts",
TIME,
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", DIM1, DIM2)),
new ArrayList<>(),
new ArrayList<>()
DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2)),
Collections.emptyList(),
Collections.emptyList()
),
null,
Arrays.asList("ts", DIM1, DIM2, "val"),
LIST_DELIMITER,
Arrays.asList(TIME, DIM1, DIM2, "val"),
false,
0
);
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}")
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.TIME_CHUNK, false, 2},
new Object[]{LockGranularity.TIME_CHUNK, true, 2},
new Object[]{LockGranularity.SEGMENT, true, 2},
new Object[]{LockGranularity.SEGMENT, true, 1} // currently spawns subtask instead of running in supervisor
new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, !USE_MULTIVALUE_DIM}, // will spawn subtask
new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, USE_MULTIVALUE_DIM} // expected to fail
);
}
// Interpret empty values in CSV as null
@Rule
public final ProvideSystemProperty noDefaultNullValue = new ProvideSystemProperty(
NullValueHandlingConfig.NULL_HANDLING_CONFIG_STRING,
"false"
);
private File inputDir;
private SetMultimap<Interval, String> intervalToDim1;
private SetMultimap<Interval, List<Object>> intervalToDims;
private final int maxNumConcurrentSubTasks;
private final boolean useMultivalueDim;
public RangePartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
int maxNumConcurrentSubTasks
int maxNumConcurrentSubTasks,
boolean useMultivalueDim
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
this.useMultivalueDim = useMultivalueDim;
}
@Override
@ -132,41 +148,65 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
{
super.setup();
inputDir = temporaryFolder.newFolder("data");
intervalToDim1 = createInputFiles(inputDir);
intervalToDims = createInputFiles(inputDir, useMultivalueDim);
}
private static SetMultimap<Interval, String> createInputFiles(File inputDir) throws IOException
private static SetMultimap<Interval, List<Object>> createInputFiles(File inputDir, boolean useMultivalueDim)
throws IOException
{
SetMultimap<Interval, String> intervalToDim1 = HashMultimap.create();
SetMultimap<Interval, List<Object>> intervalToDims = HashMultimap.create();
for (int fileIndex = 0; fileIndex < NUM_FILE; fileIndex++) {
Path path = new File(inputDir, TEST_FILE_NAME_PREFIX + fileIndex).toPath();
try (final Writer writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {
for (int i = 0; i < (NUM_ROW / NUM_DAY); i++) {
for (int d = 0; d < NUM_DAY; d++) {
writeRow(writer, i + d, fileIndex + d, intervalToDim1);
for (int i = 0; i < (NUM_ROW / DIM_FILE_CARDINALITY); i++) {
for (int d = 0; d < DIM_FILE_CARDINALITY; d++) {
int rowIndex = i * DIM_FILE_CARDINALITY + d;
String dim1Value = createDim1Value(rowIndex, fileIndex, useMultivalueDim);
writeRow(writer, i + d, dim1Value, fileIndex, intervalToDims);
}
}
}
}
return intervalToDim1;
return intervalToDims;
}
private static void writeRow(Writer writer, int day, int fileIndex, Multimap<Interval, String> intervalToDim1)
throws IOException
@Nullable
private static String createDim1Value(int rowIndex, int fileIndex, boolean useMultivalueDim)
{
if (rowIndex == fileIndex) {
return null;
}
String dim1Value = String.valueOf(fileIndex);
return useMultivalueDim ? dim1Value + LIST_DELIMITER + dim1Value : dim1Value;
}
private static void writeRow(
Writer writer,
int day,
@Nullable String dim1Value,
int fileIndex,
Multimap<Interval, List<Object>> intervalToDims
) throws IOException
{
Interval interval = Intervals.of("%s-12-%d/%s-12-%d", YEAR, day + 1, YEAR, day + 2);
String startDate = interval.getStart().toString("y-M-d");
String dim1Value = String.valueOf(fileIndex + 10);
writer.write(StringUtils.format("%s,%s,%d th test file\n", startDate, dim1Value, fileIndex));
intervalToDim1.put(interval, dim1Value);
String dim2Value = "test file " + fileIndex;
String row = startDate + ",";
if (dim1Value != null) {
row += dim1Value;
}
row += "," + dim2Value + "\n";
writer.write(row);
intervalToDims.put(interval, Arrays.asList(dim1Value, dim2Value));
}
@Test
public void createsCorrectRangePartitions() throws Exception
{
int targetRowsPerSegment = NUM_ROW / NUM_DAY / NUM_PARTITION;
int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
final Set<DataSegment> publishedSegments = runTestTask(
PARSE_SPEC,
Intervals.of("%s/%s", YEAR, YEAR + 1),
@ -178,9 +218,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
DIM1,
false
),
maxNumConcurrentSubTasks
maxNumConcurrentSubTasks,
useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
);
assertRangePartitions(publishedSegments);
if (!useMultivalueDim) {
assertRangePartitions(publishedSegments);
}
}
private void assertRangePartitions(Set<DataSegment> publishedSegments) throws IOException
@ -188,16 +232,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
Multimap<Interval, DataSegment> intervalToSegments = ArrayListMultimap.create();
publishedSegments.forEach(s -> intervalToSegments.put(s.getInterval(), s));
SortedSet<Interval> publishedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
publishedIntervals.addAll(intervalToSegments.keySet());
Set<Interval> publishedIntervals = intervalToSegments.keySet();
assertHasExpectedIntervals(publishedIntervals);
Interval firstInterval = publishedIntervals.first();
Interval lastInterval = publishedIntervals.last();
File tempSegmentDir = temporaryFolder.newFolder();
intervalToSegments.asMap().forEach((interval, segments) -> {
assertNumPartition(interval, segments, firstInterval, lastInterval);
assertNumPartition(segments);
List<String> allValues = new ArrayList<>(NUM_ROW);
for (DataSegment segment : segments) {
@ -212,22 +253,12 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
private void assertHasExpectedIntervals(Set<Interval> publishedSegmentIntervals)
{
Assert.assertEquals(intervalToDim1.keySet(), publishedSegmentIntervals);
Assert.assertEquals(intervalToDims.keySet(), publishedSegmentIntervals);
}
private static void assertNumPartition(
Interval interval,
Collection<DataSegment> segments,
Interval firstInterval,
Interval lastInterval
)
private static void assertNumPartition(Collection<DataSegment> segments)
{
int expectedNumPartition = NUM_PARTITION;
if (interval.equals(firstInterval) || interval.equals(lastInterval)) {
expectedNumPartition -= 1;
}
expectedNumPartition *= NUM_DAY;
Assert.assertEquals(expectedNumPartition, segments.size());
Assert.assertEquals(NUM_PARTITION, segments.size());
}
private List<String> getColumnValues(DataSegment segment, File tempDir)
@ -253,18 +284,24 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
}
if (end != null) {
Assert.assertThat(value.compareTo(end), Matchers.lessThan(0));
if (value == null) {
Assert.assertNull("null values should be in first partition", start);
} else {
Assert.assertThat(value.compareTo(end), Matchers.lessThan(0));
}
}
}
}
private void assertIntervalHasAllExpectedValues(Interval interval, List<String> actualValues)
{
List<String> expectedValues = new ArrayList<>(intervalToDim1.get(interval));
Assert.assertEquals(expectedValues.size(), actualValues.size());
Collections.sort(expectedValues);
Collections.sort(actualValues);
Assert.assertEquals(expectedValues, actualValues);
List<String> expectedValues = intervalToDims.get(interval)
.stream()
.map(d -> (String) d.get(0))
.sorted(Comparators.naturalNullsFirst())
.collect(Collectors.toList());
actualValues.sort(Comparators.naturalNullsFirst());
Assert.assertEquals(interval.toString(), expectedValues, actualValues);
}
@Override