diff --git a/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java b/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java index ae15d95969c..fa059478226 100644 --- a/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java +++ b/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java @@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class NullValueHandlingConfig { - private static final String NULL_HANDLING_CONFIG_STRING = "druid.generic.useDefaultValueForNull"; + public static final String NULL_HANDLING_CONFIG_STRING = "druid.generic.useDefaultValueForNull"; @JsonProperty("useDefaultValueForNull") private final boolean useDefaultValuesForNull; diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 19397787a85..82564cbacdf 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -277,6 +277,11 @@ equalsverifier test + + com.github.stefanbirkner + system-rules + test + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index a7c4396b780..aaa31ad134e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -108,7 +108,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn File inputDir, String filter, DimensionBasedPartitionsSpec partitionsSpec, - int maxNumConcurrentSubTasks + int maxNumConcurrentSubTasks, + TaskState expectedTaskStatus ) throws Exception { final ParallelIndexSupervisorTask task = newTask( @@ -129,7 +130,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn TaskStatus taskStatus = task.run(toolbox); - Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); + Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode()); shutdownTask(task); return actionClient.getPublishedSegments(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 029d2bf20c6..9731272fc50 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -170,6 +170,10 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase catch (EntryExistsException e) { throw new RuntimeException(e); } + + // WARNING: In production, subtasks are created via HTTP calls and instantiated by Jackson, which means they + // cannot share objects like they can here. For example, if the indexing task uses JsonParseSpec, the same + // JSONFlattenerMaker instance is shared among subtasks, which is bad since JSONFlattenerMaker is not thread-safe. tasks.put(subTask.getId(), service.submit(() -> { try { final TaskToolbox toolbox = createTaskToolbox(subTask); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java index 7219f16ef88..4670d01b7b8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; @@ -131,7 +132,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh inputDir, "test_*", new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")), - MAX_NUM_CONCURRENT_SUB_TASKS + MAX_NUM_CONCURRENT_SUB_TASKS, + TaskState.SUCCESS ); assertHashedPartition(publishedSegments); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java index fcdd5bb088e..a4b83320e57 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java @@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.SetMultimap; import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.common.config.NullValueHandlingConfig; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; @@ -38,7 +40,6 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.timeline.DataSegment; @@ -48,7 +49,9 @@ import org.hamcrest.Matchers; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.contrib.java.lang.system.ProvideSystemProperty; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -67,63 +70,76 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.stream.Collectors; @RunWith(Parameterized.class) public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest { + private static final boolean USE_INPUT_FORMAT_API = true; + private static final boolean USE_MULTIVALUE_DIM = true; private static final int NUM_FILE = 10; private static final int NUM_ROW = 20; - private static final int NUM_DAY = 2; + private static final int DIM_FILE_CARDINALITY = 2; private static final int NUM_PARTITION = 2; private static final int YEAR = 2017; + private static final String TIME = "ts"; private static final String DIM1 = "dim1"; private static final String DIM2 = "dim2"; + private static final String LIST_DELIMITER = "|"; private static final List DIMS = ImmutableList.of(DIM1, DIM2); private static final String TEST_FILE_NAME_PREFIX = "test_"; private static final ParseSpec PARSE_SPEC = new CSVParseSpec( new TimestampSpec( - "ts", + TIME, "auto", null ), new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", DIM1, DIM2)), - new ArrayList<>(), - new ArrayList<>() + DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2)), + Collections.emptyList(), + Collections.emptyList() ), - null, - Arrays.asList("ts", DIM1, DIM2, "val"), + LIST_DELIMITER, + Arrays.asList(TIME, DIM1, DIM2, "val"), false, 0 ); - @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}") + @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}") public static Iterable constructorFeeder() { return ImmutableList.of( - new Object[]{LockGranularity.TIME_CHUNK, false, 2}, - new Object[]{LockGranularity.TIME_CHUNK, true, 2}, - new Object[]{LockGranularity.SEGMENT, true, 2}, - new Object[]{LockGranularity.SEGMENT, true, 1} // currently spawns subtask instead of running in supervisor + new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM}, + new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM}, + new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM}, + new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, !USE_MULTIVALUE_DIM}, // will spawn subtask + new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, USE_MULTIVALUE_DIM} // expected to fail ); } + // Interpret empty values in CSV as null + @Rule + public final ProvideSystemProperty noDefaultNullValue = new ProvideSystemProperty( + NullValueHandlingConfig.NULL_HANDLING_CONFIG_STRING, + "false" + ); + private File inputDir; - private SetMultimap intervalToDim1; + private SetMultimap> intervalToDims; private final int maxNumConcurrentSubTasks; + private final boolean useMultivalueDim; public RangePartitionMultiPhaseParallelIndexingTest( LockGranularity lockGranularity, boolean useInputFormatApi, - int maxNumConcurrentSubTasks + int maxNumConcurrentSubTasks, + boolean useMultivalueDim ) { super(lockGranularity, useInputFormatApi); this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks; + this.useMultivalueDim = useMultivalueDim; } @Override @@ -132,41 +148,65 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP { super.setup(); inputDir = temporaryFolder.newFolder("data"); - intervalToDim1 = createInputFiles(inputDir); + intervalToDims = createInputFiles(inputDir, useMultivalueDim); } - private static SetMultimap createInputFiles(File inputDir) throws IOException + private static SetMultimap> createInputFiles(File inputDir, boolean useMultivalueDim) + throws IOException { - SetMultimap intervalToDim1 = HashMultimap.create(); + SetMultimap> intervalToDims = HashMultimap.create(); for (int fileIndex = 0; fileIndex < NUM_FILE; fileIndex++) { Path path = new File(inputDir, TEST_FILE_NAME_PREFIX + fileIndex).toPath(); try (final Writer writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) { - for (int i = 0; i < (NUM_ROW / NUM_DAY); i++) { - for (int d = 0; d < NUM_DAY; d++) { - writeRow(writer, i + d, fileIndex + d, intervalToDim1); + for (int i = 0; i < (NUM_ROW / DIM_FILE_CARDINALITY); i++) { + for (int d = 0; d < DIM_FILE_CARDINALITY; d++) { + int rowIndex = i * DIM_FILE_CARDINALITY + d; + String dim1Value = createDim1Value(rowIndex, fileIndex, useMultivalueDim); + writeRow(writer, i + d, dim1Value, fileIndex, intervalToDims); } } } } - return intervalToDim1; + return intervalToDims; } - private static void writeRow(Writer writer, int day, int fileIndex, Multimap intervalToDim1) - throws IOException + @Nullable + private static String createDim1Value(int rowIndex, int fileIndex, boolean useMultivalueDim) + { + if (rowIndex == fileIndex) { + return null; + } + + String dim1Value = String.valueOf(fileIndex); + return useMultivalueDim ? dim1Value + LIST_DELIMITER + dim1Value : dim1Value; + } + + private static void writeRow( + Writer writer, + int day, + @Nullable String dim1Value, + int fileIndex, + Multimap> intervalToDims + ) throws IOException { Interval interval = Intervals.of("%s-12-%d/%s-12-%d", YEAR, day + 1, YEAR, day + 2); String startDate = interval.getStart().toString("y-M-d"); - String dim1Value = String.valueOf(fileIndex + 10); - writer.write(StringUtils.format("%s,%s,%d th test file\n", startDate, dim1Value, fileIndex)); - intervalToDim1.put(interval, dim1Value); + String dim2Value = "test file " + fileIndex; + String row = startDate + ","; + if (dim1Value != null) { + row += dim1Value; + } + row += "," + dim2Value + "\n"; + writer.write(row); + intervalToDims.put(interval, Arrays.asList(dim1Value, dim2Value)); } @Test public void createsCorrectRangePartitions() throws Exception { - int targetRowsPerSegment = NUM_ROW / NUM_DAY / NUM_PARTITION; + int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION; final Set publishedSegments = runTestTask( PARSE_SPEC, Intervals.of("%s/%s", YEAR, YEAR + 1), @@ -178,9 +218,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP DIM1, false ), - maxNumConcurrentSubTasks + maxNumConcurrentSubTasks, + useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS ); - assertRangePartitions(publishedSegments); + + if (!useMultivalueDim) { + assertRangePartitions(publishedSegments); + } } private void assertRangePartitions(Set publishedSegments) throws IOException @@ -188,16 +232,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP Multimap intervalToSegments = ArrayListMultimap.create(); publishedSegments.forEach(s -> intervalToSegments.put(s.getInterval(), s)); - SortedSet publishedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); - publishedIntervals.addAll(intervalToSegments.keySet()); + Set publishedIntervals = intervalToSegments.keySet(); assertHasExpectedIntervals(publishedIntervals); - Interval firstInterval = publishedIntervals.first(); - Interval lastInterval = publishedIntervals.last(); File tempSegmentDir = temporaryFolder.newFolder(); intervalToSegments.asMap().forEach((interval, segments) -> { - assertNumPartition(interval, segments, firstInterval, lastInterval); + assertNumPartition(segments); List allValues = new ArrayList<>(NUM_ROW); for (DataSegment segment : segments) { @@ -212,22 +253,12 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP private void assertHasExpectedIntervals(Set publishedSegmentIntervals) { - Assert.assertEquals(intervalToDim1.keySet(), publishedSegmentIntervals); + Assert.assertEquals(intervalToDims.keySet(), publishedSegmentIntervals); } - private static void assertNumPartition( - Interval interval, - Collection segments, - Interval firstInterval, - Interval lastInterval - ) + private static void assertNumPartition(Collection segments) { - int expectedNumPartition = NUM_PARTITION; - if (interval.equals(firstInterval) || interval.equals(lastInterval)) { - expectedNumPartition -= 1; - } - expectedNumPartition *= NUM_DAY; - Assert.assertEquals(expectedNumPartition, segments.size()); + Assert.assertEquals(NUM_PARTITION, segments.size()); } private List getColumnValues(DataSegment segment, File tempDir) @@ -253,18 +284,24 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP } if (end != null) { - Assert.assertThat(value.compareTo(end), Matchers.lessThan(0)); + if (value == null) { + Assert.assertNull("null values should be in first partition", start); + } else { + Assert.assertThat(value.compareTo(end), Matchers.lessThan(0)); + } } } } private void assertIntervalHasAllExpectedValues(Interval interval, List actualValues) { - List expectedValues = new ArrayList<>(intervalToDim1.get(interval)); - Assert.assertEquals(expectedValues.size(), actualValues.size()); - Collections.sort(expectedValues); - Collections.sort(actualValues); - Assert.assertEquals(expectedValues, actualValues); + List expectedValues = intervalToDims.get(interval) + .stream() + .map(d -> (String) d.get(0)) + .sorted(Comparators.naturalNullsFirst()) + .collect(Collectors.toList()); + actualValues.sort(Comparators.naturalNullsFirst()); + Assert.assertEquals(interval.toString(), expectedValues, actualValues); } @Override