Add maxNumFiles to splitHintSpec (#10243)

* Add maxNumFiles to splitHintSpec

* missing link

* fix build failure; use maxNumFiles for integration tests

* spelling

* lower default

* Update docs/ingestion/native-batch.md

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>

* address comments; change default maxSplitSize

* spelling

* typos and doc

* same change for segments splitHintSpec

* fix build

* fix build

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
This commit is contained in:
Jihoon Son 2020-08-21 09:43:58 -07:00 committed by GitHub
parent 7620b0c54e
commit b5b3e6ecce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 197 additions and 57 deletions

View File

@ -22,7 +22,9 @@ package org.apache.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.HumanReadableBytes;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -43,22 +45,55 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
public static final String TYPE = "maxSize";
@VisibleForTesting
static final long DEFAULT_MAX_SPLIT_SIZE = 512 * 1024 * 1024;
static final HumanReadableBytes DEFAULT_MAX_SPLIT_SIZE = new HumanReadableBytes("1GiB");
private final long maxSplitSize;
/**
* There are two known issues when a split contains a large list of files.
*
* - 'jute.maxbuffer' in ZooKeeper. This system property controls the max size of ZNode. As its default is 500KB,
* task allocation can fail if the serialized ingestion spec is larger than this limit.
* - 'max_allowed_packet' in MySQL. This is the max size of a communication packet sent to a MySQL server.
* The default is either 64MB or 4MB depending on MySQL version. Updating metadata store can fail if the serialized
* ingestion spec is larger than this limit.
*
* The default is conservatively chosen as 1000.
*/
@VisibleForTesting
static final int DEFAULT_MAX_NUM_FILES = 1000;
private final HumanReadableBytes maxSplitSize;
private final int maxNumFiles;
@JsonCreator
public MaxSizeSplitHintSpec(@JsonProperty("maxSplitSize") @Nullable Long maxSplitSize)
public MaxSizeSplitHintSpec(
@JsonProperty("maxSplitSize") @Nullable HumanReadableBytes maxSplitSize,
@JsonProperty("maxNumFiles") @Nullable Integer maxNumFiles
)
{
this.maxSplitSize = maxSplitSize == null ? DEFAULT_MAX_SPLIT_SIZE : maxSplitSize;
this.maxNumFiles = maxNumFiles == null ? DEFAULT_MAX_NUM_FILES : maxNumFiles;
Preconditions.checkArgument(this.maxSplitSize.getBytes() > 0, "maxSplitSize should be larger than 0");
Preconditions.checkArgument(this.maxNumFiles > 0, "maxNumFiles should be larger than 0");
}
@VisibleForTesting
public MaxSizeSplitHintSpec(long maxSplitSize, @Nullable Integer maxNumFiles)
{
this(new HumanReadableBytes(maxSplitSize), maxNumFiles);
}
@JsonProperty
public long getMaxSplitSize()
public HumanReadableBytes getMaxSplitSize()
{
return maxSplitSize;
}
@JsonProperty
public int getMaxNumFiles()
{
return maxNumFiles;
}
@Override
public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
{
@ -68,6 +103,7 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
);
return new Iterator<List<T>>()
{
private final long maxSplitSizeBytes = maxSplitSize.getBytes();
private T peeking;
@Override
@ -84,12 +120,13 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
}
final List<T> current = new ArrayList<>();
long splitSize = 0;
while (splitSize < maxSplitSize && (peeking != null || nonEmptyFileOnlyIterator.hasNext())) {
while (splitSize < maxSplitSizeBytes && (peeking != null || nonEmptyFileOnlyIterator.hasNext())) {
if (peeking == null) {
peeking = nonEmptyFileOnlyIterator.next();
}
final long size = inputAttributeExtractor.apply(peeking).getSize();
if (current.isEmpty() || splitSize + size < maxSplitSize) {
if (current.isEmpty() // each split should have at least one file even if the file is larger than maxSplitSize
|| (splitSize + size < maxSplitSizeBytes && current.size() < maxNumFiles)) {
current.add(peeking);
splitSize += size;
peeking = null;
@ -113,12 +150,13 @@ public class MaxSizeSplitHintSpec implements SplitHintSpec
return false;
}
MaxSizeSplitHintSpec that = (MaxSizeSplitHintSpec) o;
return maxSplitSize == that.maxSplitSize;
return maxNumFiles == that.maxNumFiles &&
Objects.equals(maxSplitSize, that.maxSplitSize);
}
@Override
public int hashCode()
{
return Objects.hash(maxSplitSize);
return Objects.hash(maxSplitSize, maxNumFiles);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.HumanReadableBytes;
import javax.annotation.Nullable;
import java.util.Iterator;
@ -41,28 +42,40 @@ public class SegmentsSplitHintSpec implements SplitHintSpec
{
public static final String TYPE = "segments";
private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 500 * 1024 * 1024;
private static final HumanReadableBytes DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = new HumanReadableBytes("1GiB");
private static final int DEFAULT_MAX_NUM_SEGMENTS = 1000;
/**
* Maximum number of bytes of input segments to process in a single task.
* If a single segment is larger than this number, it will be processed by itself in a single task.
*/
private final long maxInputSegmentBytesPerTask;
private final HumanReadableBytes maxInputSegmentBytesPerTask;
private final int maxNumSegments;
@JsonCreator
public SegmentsSplitHintSpec(@JsonProperty("maxInputSegmentBytesPerTask") @Nullable Long maxInputSegmentBytesPerTask)
public SegmentsSplitHintSpec(
@JsonProperty("maxInputSegmentBytesPerTask") @Nullable HumanReadableBytes maxInputSegmentBytesPerTask,
@JsonProperty("maxNumSegments") @Nullable Integer maxNumSegments
)
{
this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null
? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK
: maxInputSegmentBytesPerTask;
this.maxNumSegments = maxNumSegments == null ? DEFAULT_MAX_NUM_SEGMENTS : maxNumSegments;
}
@JsonProperty
public long getMaxInputSegmentBytesPerTask()
public HumanReadableBytes getMaxInputSegmentBytesPerTask()
{
return maxInputSegmentBytesPerTask;
}
@JsonProperty
public int getMaxNumSegments()
{
return maxNumSegments;
}
@Override
public <T> Iterator<List<T>> split(Iterator<T> inputIterator, Function<T, InputFileAttribute> inputAttributeExtractor)
{
@ -80,13 +93,14 @@ public class SegmentsSplitHintSpec implements SplitHintSpec
return false;
}
SegmentsSplitHintSpec that = (SegmentsSplitHintSpec) o;
return maxInputSegmentBytesPerTask == that.maxInputSegmentBytesPerTask;
return maxNumSegments == that.maxNumSegments &&
Objects.equals(maxInputSegmentBytesPerTask, that.maxInputSegmentBytesPerTask);
}
@Override
public int hashCode()
{
return Objects.hash(maxInputSegmentBytesPerTask);
return Objects.hash(maxInputSegmentBytesPerTask, maxNumSegments);
}
@Override
@ -94,6 +108,7 @@ public class SegmentsSplitHintSpec implements SplitHintSpec
{
return "SegmentsSplitHintSpec{" +
"maxInputSegmentBytesPerTask=" + maxInputSegmentBytesPerTask +
", maxNumSegments=" + maxNumSegments +
'}';
}
}

View File

@ -35,7 +35,7 @@ import java.util.stream.Stream;
*/
public interface SplittableInputSource<T> extends InputSource
{
SplitHintSpec DEFAULT_SPLIT_HINT_SPEC = new MaxSizeSplitHintSpec(null);
SplitHintSpec DEFAULT_SPLIT_HINT_SPEC = new MaxSizeSplitHintSpec(null, null);
@JsonIgnore
@Override

View File

@ -19,11 +19,15 @@
package org.apache.druid.data.input;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.List;
@ -32,27 +36,66 @@ import java.util.stream.IntStream;
public class MaxSizeSplitHintSpecTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper();
final MaxSizeSplitHintSpec original = new MaxSizeSplitHintSpec(1024L);
final MaxSizeSplitHintSpec original = new MaxSizeSplitHintSpec(new HumanReadableBytes(1024L), 20_000);
final byte[] bytes = mapper.writeValueAsBytes(original);
final MaxSizeSplitHintSpec fromJson = (MaxSizeSplitHintSpec) mapper.readValue(bytes, SplitHintSpec.class);
Assert.assertEquals(original, fromJson);
}
@Test
public void testCreateWithNullReturningDefaultMaxSplitSize()
public void testReadFromJson() throws JsonProcessingException
{
Assert.assertEquals(MaxSizeSplitHintSpec.DEFAULT_MAX_SPLIT_SIZE, new MaxSizeSplitHintSpec(null).getMaxSplitSize());
final ObjectMapper mapper = new ObjectMapper();
final String json = "{"
+ " \"type\":\"maxSize\","
+ " \"maxSplitSize\":1024,"
+ " \"maxNumFiles\":20000"
+ "}\n";
final MaxSizeSplitHintSpec fromJson = (MaxSizeSplitHintSpec) mapper.readValue(json, SplitHintSpec.class);
Assert.assertEquals(new MaxSizeSplitHintSpec(new HumanReadableBytes(1024L), 20_000), fromJson);
}
@Test
public void testConstructorWith0MaxNumFiles()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("maxNumFiles should be larger than 0");
new MaxSizeSplitHintSpec(null, 0);
}
@Test
public void testConstructorWith0MaxSplitSize()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("maxSplitSize should be larger than 0");
new MaxSizeSplitHintSpec(0, null);
}
@Test
public void testDefaults()
{
Assert.assertEquals(
MaxSizeSplitHintSpec.DEFAULT_MAX_SPLIT_SIZE,
new MaxSizeSplitHintSpec(null, null).getMaxSplitSize()
);
Assert.assertEquals(
MaxSizeSplitHintSpec.DEFAULT_MAX_NUM_FILES,
new MaxSizeSplitHintSpec(null, null).getMaxNumFiles()
);
}
@Test
public void testSplitSmallInputsGroupingIntoLargerSplits()
{
final int eachInputSize = 3;
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(10L);
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(new HumanReadableBytes(10L), 10_000);
final Function<Integer, InputFileAttribute> inputAttributeExtractor = InputFileAttribute::new;
final List<List<Integer>> splits = Lists.newArrayList(
splitHintSpec.split(IntStream.generate(() -> eachInputSize).limit(10).iterator(), inputAttributeExtractor)
@ -68,7 +111,7 @@ public class MaxSizeSplitHintSpecTest
public void testSplitLargeInputsReturningSplitsOfSingleInput()
{
final int eachInputSize = 15;
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(10L);
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(new HumanReadableBytes(10L), 10_000);
final Function<Integer, InputFileAttribute> inputAttributeExtractor = InputFileAttribute::new;
final List<List<Integer>> splits = Lists.newArrayList(
splitHintSpec.split(IntStream.generate(() -> eachInputSize).limit(10).iterator(), inputAttributeExtractor)
@ -79,11 +122,27 @@ public class MaxSizeSplitHintSpecTest
}
}
@Test
public void testSplitSmallInputsWithMaxNumFilesEachSplitShouldHaveLessFilesAssigned()
{
final int eachInputSize = 3;
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(new HumanReadableBytes("500M"), 3);
final Function<Integer, InputFileAttribute> inputAttributeExtractor = InputFileAttribute::new;
final List<List<Integer>> splits = Lists.newArrayList(
splitHintSpec.split(IntStream.generate(() -> eachInputSize).limit(10).iterator(), inputAttributeExtractor)
);
Assert.assertEquals(4, splits.size());
Assert.assertEquals(3, splits.get(0).size());
Assert.assertEquals(3, splits.get(1).size());
Assert.assertEquals(3, splits.get(2).size());
Assert.assertEquals(1, splits.get(3).size());
}
@Test
public void testSplitSkippingEmptyInputs()
{
final int nonEmptyInputSize = 3;
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(10L);
final MaxSizeSplitHintSpec splitHintSpec = new MaxSizeSplitHintSpec(new HumanReadableBytes(10L), null);
final Function<Integer, InputFileAttribute> inputAttributeExtractor = InputFileAttribute::new;
final IntStream dataStream = IntStream.concat(
IntStream.concat(
@ -105,6 +164,10 @@ public class MaxSizeSplitHintSpecTest
@Test
public void testEquals()
{
EqualsVerifier.forClass(MaxSizeSplitHintSpec.class).withNonnullFields("maxSplitSize").usingGetClass().verify();
EqualsVerifier.forClass(MaxSizeSplitHintSpec.class)
.withNonnullFields("maxSplitSize")
.withNonnullFields("maxNumFiles")
.usingGetClass()
.verify();
}
}

View File

@ -25,6 +25,7 @@ import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.utils.Streams;
import org.easymock.EasyMock;
import org.junit.Assert;
@ -69,11 +70,11 @@ public class LocalInputSourceTest
public void testCreateSplitsRespectingSplitHintSpec()
{
final long fileSize = 15;
final long maxSplitSize = 50;
final HumanReadableBytes maxSplitSize = new HumanReadableBytes(50L);
final Set<File> files = mockFiles(10, fileSize);
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
final List<InputSplit<List<File>>> splits = inputSource
.createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize))
.createSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize, null))
.collect(Collectors.toList());
Assert.assertEquals(4, splits.size());
Assert.assertEquals(3, splits.get(0).get().size());
@ -86,12 +87,12 @@ public class LocalInputSourceTest
public void testEstimateNumSplitsRespectingSplitHintSpec()
{
final long fileSize = 13;
final long maxSplitSize = 40;
final HumanReadableBytes maxSplitSize = new HumanReadableBytes(40L);
final Set<File> files = mockFiles(10, fileSize);
final LocalInputSource inputSource = new LocalInputSource(null, null, files);
Assert.assertEquals(
4,
inputSource.estimateNumSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize))
inputSource.estimateNumSplits(new NoopInputFormat(), new MaxSizeSplitHintSpec(maxSplitSize, null))
);
}

View File

@ -232,7 +232,8 @@ The size-based split hint spec is respected by all splittable input sources exce
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `maxSize`.|none|yes|
|maxSplitSize|Maximum number of bytes of input files to process in a single task. If a single file is larger than this number, it will be processed by itself in a single task (Files are never split across tasks yet).|500MB|no|
|maxSplitSize|Maximum number of bytes of input files to process in a single subtask. If a single file is larger than this number, it will be processed by itself in a single subtask (Files are never split across tasks yet). Note that one subtask will not process more files than `maxNumFiles` even when their total size is smaller than `maxSplitSize`. [Human-readable format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
|maxNumFiles|Maximum number of input files to process in a single subtask. This limit is to avoid task failures when the ingestion spec is too long. There are two known limits on the max size of serialized ingestion spec, i.e., the max ZNode size in ZooKeeper (`jute.maxbuffer`) and the max packet size in MySQL (`max_allowed_packet`). These can make ingestion tasks fail if the serialized ingestion spec size hits one of them. Note that one subtask will not process more data than `maxSplitSize` even when the total number of files is smaller than `maxNumFiles`.|1000|no|
#### Segments Split Hint Spec
@ -241,7 +242,8 @@ The segments split hint spec is used only for [`DruidInputSource`](#druid-input-
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should always be `segments`.|none|yes|
|maxInputSegmentBytesPerTask|Maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks).|500MB|no|
|maxInputSegmentBytesPerTask|Maximum number of bytes of input segments to process in a single subtask. If a single segment is larger than this number, it will be processed by itself in a single subtask (input segments are never split across tasks). Note that one subtask will not process more segments than `maxNumSegments` even when their total size is smaller than `maxInputSegmentBytesPerTask`. [Human-readable format](../configuration/human-readable-byte.md) is supported.|1GiB|no|
|maxNumSegments|Maximum number of input segments to process in a single subtask. This limit is to avoid task failures when the ingestion spec is too long. There are two known limits on the max size of serialized ingestion spec, i.e., the max ZNode size in ZooKeeper (`jute.maxbuffer`) and the max packet size in MySQL (`max_allowed_packet`). These can make ingestion tasks fail if the serialized ingestion spec size hits one of them. Note that one subtask will not process more data than `maxInputSegmentBytesPerTask` even when the total number of segments is smaller than `maxNumSegments`.|1000|no|
### `partitionsSpec`

View File

@ -51,6 +51,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -339,7 +340,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(1L) // set maxSplitSize to 1 so that each inputSplit has only one object
new MaxSizeSplitHintSpec(null, 1)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
@ -365,7 +366,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(CONTENT.length * 3L)
new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null)
);
Assert.assertEquals(

View File

@ -163,7 +163,7 @@ public class AzureInputSourceTest extends EasyMockSupport
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
new MaxSizeSplitHintSpec(1L)
new MaxSizeSplitHintSpec(null, 1)
);
List<List<CloudObjectLocation>> actualCloudLocationList = cloudObjectStream.map(InputSplit::get)

View File

@ -44,6 +44,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -170,7 +171,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(1L) // set maxSplitSize to 1 so that each inputSplit has only one object
new MaxSizeSplitHintSpec(null, 1)
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
@ -192,7 +193,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(CONTENT.length * 3L)
new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null)
);
Assert.assertEquals(

View File

@ -251,7 +251,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
public void hasCorrectSplits() throws IOException
{
// Set maxSplitSize to 1 so that each inputSplit has only one object
List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(1L))
List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(1L, null))
.collect(Collectors.toList());
splits.forEach(split -> Assert.assertEquals(1, split.get().size()));
Set<Path> actualPaths = splits.stream()
@ -264,7 +264,7 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
@Test
public void createSplitsRespectSplitHintSpec() throws IOException
{
List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(7L))
List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(7L, null))
.collect(Collectors.toList());
Assert.assertEquals(2, splits.size());
Assert.assertEquals(2, splits.get(0).get().size());
@ -275,15 +275,14 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
public void hasCorrectNumberOfSplits() throws IOException
{
// Set maxSplitSize to 1 so that each inputSplit has only one object
int numSplits = target.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L));
int numSplits = target.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L, null));
Assert.assertEquals(NUM_FILE, numSplits);
}
@Test
public void createCorrectInputSourceWithSplit() throws Exception
{
// Set maxSplitSize to 1 so that each inputSplit has only one object
List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(1L))
List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(null, 1))
.collect(Collectors.toList());
for (InputSplit<List<Path>> split : splits) {

View File

@ -52,6 +52,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -390,7 +391,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(1L) // set maxSplitSize to 1 so that each inputSplit has only one object
new MaxSizeSplitHintSpec(null, 1)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
@ -417,7 +418,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(CONTENT.length * 3L)
new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null)
);
Assert.assertEquals(

View File

@ -38,6 +38,7 @@ import org.apache.druid.indexing.common.ReingestionTimelineUtils;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
@ -289,7 +290,14 @@ public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<Input
retryPolicyFactory,
dataSource,
interval,
splitHintSpec == null ? new SegmentsSplitHintSpec(maxInputSegmentBytesPerTask) : splitHintSpec
splitHintSpec == null
? new SegmentsSplitHintSpec(
maxInputSegmentBytesPerTask == null
? null
: new HumanReadableBytes(maxInputSegmentBytesPerTask),
null
)
: splitHintSpec
)
);
}

View File

@ -233,7 +233,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
retryPolicyFactory,
dataSource,
interval,
splitHintSpec == null ? new MaxSizeSplitHintSpec(null) : splitHintSpec
splitHintSpec == null ? SplittableInputSource.DEFAULT_SPLIT_HINT_SPEC : splitHintSpec
)
);
} else {
@ -253,7 +253,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
retryPolicyFactory,
dataSource,
interval,
splitHintSpec == null ? new MaxSizeSplitHintSpec(null) : splitHintSpec
splitHintSpec == null ? SplittableInputSource.DEFAULT_SPLIT_HINT_SPEC : splitHintSpec
)
);
} else {
@ -294,8 +294,10 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
{
final SplitHintSpec convertedSplitHintSpec;
if (splitHintSpec instanceof SegmentsSplitHintSpec) {
final SegmentsSplitHintSpec segmentsSplitHintSpec = (SegmentsSplitHintSpec) splitHintSpec;
convertedSplitHintSpec = new MaxSizeSplitHintSpec(
((SegmentsSplitHintSpec) splitHintSpec).getMaxInputSegmentBytesPerTask()
segmentsSplitHintSpec.getMaxInputSegmentBytesPerTask(),
segmentsSplitHintSpec.getMaxNumSegments()
);
} else {
convertedSplitHintSpec = splitHintSpec;

View File

@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
@ -85,7 +86,7 @@ public class ClientCompactionTaskQuerySerdeTest
40000,
2000L,
30000L,
new SegmentsSplitHintSpec(100000L),
new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10),
new IndexSpec(
new DefaultBitmapSerdeFactory(),
CompressionStrategy.LZ4,
@ -174,7 +175,7 @@ public class ClientCompactionTaskQuerySerdeTest
2000L,
null,
null,
new SegmentsSplitHintSpec(100000L),
new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10),
new DynamicPartitionsSpec(100, 30000L),
new IndexSpec(
new DefaultBitmapSerdeFactory(),
@ -217,7 +218,7 @@ public class ClientCompactionTaskQuerySerdeTest
40000,
2000L,
30000L,
new SegmentsSplitHintSpec(100000L),
new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10),
new IndexSpec(
new DefaultBitmapSerdeFactory(),
CompressionStrategy.LZ4,

View File

@ -270,7 +270,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
RETRY_POLICY_FACTORY,
DATA_SOURCE,
INTERVAL_TO_INDEX,
new SegmentsSplitHintSpec(1L) // each segment gets its own split with this config
new SegmentsSplitHintSpec(null, 1)
)
);

View File

@ -239,7 +239,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
new MaxSizeSplitHintSpec(1L), // set maxSplitSize to 1 so that each split has only one file.
new MaxSizeSplitHintSpec(null, 1),
partitionsSpec,
null,
null,

View File

@ -37,7 +37,8 @@ import java.util.function.Function;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITBestEffortRollupParallelIndexTest extends AbstractITBatchIndexTest
{
// The task specs here use the MaxSizeSplitHintSpec with maxSplitSize of 1. This is to create splits per file.
// This ingestion spec has a splitHintSpec of maxSplitSize of 1 to test whether or not the task can handle
// maxSplitSize of 1 properly.
private static final String INDEX_TASK = "/indexer/wikipedia_parallel_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_index_queries.json";
private static final String REINDEX_TASK = "/indexer/wikipedia_parallel_reindex_task.json";

View File

@ -38,7 +38,8 @@ import java.util.function.Function;
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest
{
// The task specs here use the MaxSizeSplitHintSpec with maxSplitSize of 1. This is to create splits per file.
// This ingestion spec has a splitHintSpec of maxSplitSize of 1 to test whether or not the task can handle
// maxSplitSize of 1 properly.
private static final String INDEX_TASK = "/indexer/wikipedia_parallel_index_task.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_index_queries.json";
private static final String INDEX_DATASOURCE = "wikipedia_parallel_index_test";

View File

@ -80,7 +80,7 @@
"partitionsSpec": %%PARTITIONS_SPEC%%,
"splitHintSpec": {
"type": "maxSize",
"maxSplitSize": 1
"maxNumFiles": 1
}
}
}

View File

@ -79,7 +79,7 @@
"maxNumConcurrentSubTasks": 4,
"splitHintSpec": {
"type": "maxSize",
"maxSplitSize": 1
"maxNumFiles": 1
},
"forceGuaranteedRollup": %%FORCE_GUARANTEED_ROLLUP%%,
"partitionsSpec": %%PARTITIONS_SPEC%%

View File

@ -61,7 +61,7 @@
"forceGuaranteedRollup": "%%FORCE_GUARANTEED_ROLLUP%%",
"splitHintSpec": {
"type": "maxSize",
"maxSplitSize": 1
"maxNumFiles": 1
},
"partitionsSpec": %%PARTITIONS_SPEC%%
}

View File

@ -67,7 +67,7 @@
"forceGuaranteedRollup": "%%FORCE_GUARANTEED_ROLLUP%%",
"splitHintSpec": {
"type": "maxSize",
"maxSplitSize": 1
"maxNumFiles": 1
},
"partitionsSpec": %%PARTITIONS_SPEC%%
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
@ -164,7 +165,7 @@ public class DataSourceCompactionConfigTest
1000,
10000L,
2000L,
new SegmentsSplitHintSpec(10000L),
new SegmentsSplitHintSpec(new HumanReadableBytes(10000L), null),
new IndexSpec(
new RoaringBitmapSerdeFactory(false),
CompressionStrategy.LZF,

View File

@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
@ -55,7 +56,7 @@ public class UserCompactionTaskQueryTuningConfigTest
1000,
10000L,
2000L,
new SegmentsSplitHintSpec(42L),
new SegmentsSplitHintSpec(new HumanReadableBytes(42L), null),
new IndexSpec(
new RoaringBitmapSerdeFactory(false),
CompressionStrategy.LZF,

View File

@ -19,6 +19,7 @@
# one word per line, to define a file override use ' - filename'
# where filename is relative to this configuration file
32-bit
500MiB
64-bit
ACL
APIs
@ -281,6 +282,8 @@ lookback
lookups
mapreduce
masse
maxNumFiles
maxNumSegments
max_map_count
memcached
mergeable
@ -373,6 +376,7 @@ subqueries
subquery
subsecond
substring
subtask
subtasks
symlink
tiering