mirror of https://github.com/apache/druid.git
Add forceExtendableShardSpecs option to Hadoop indexing, IndexTask. (#3473)
Fixes #3241.
This commit is contained in:
parent
05ea88df5c
commit
27bd5cb13a
|
@ -26,8 +26,9 @@ Core extensions are maintained by Druid committers.
|
|||
|druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-aggregators.html)|
|
||||
|druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)|
|
||||
|druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)|
|
||||
|druid-kafka-eight|Kafka ingest firehose (high level consumer).|[link](../development/extensions-core/kafka-eight-firehose.html)|
|
||||
|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes.|[link](../development/extensions-core/kafka-eight-firehose.html)|
|
||||
|druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)|
|
||||
|druid-kafka-indexing-service|Supervised exactly-once Kafka ingestion for the indexing service.|[link](../development/extensions-core/kafka-ingestion.html)|
|
||||
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
|
||||
|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.html)|
|
||||
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
|
||||
|
|
|
@ -165,6 +165,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
|
||||
|buildV9Directly|Boolean|Build v9 index directly instead of building v8 index and converting it to v9 format.|no (default = false)|
|
||||
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|
||||
|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|no (default = false)|
|
||||
|
||||
#### jobProperties field of TuningConfig
|
||||
|
||||
|
|
|
@ -113,6 +113,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|75000|no|
|
||||
|numShards|Directly specify the number of shards to create. You can skip the intermediate persist step if you specify the number of shards you want and set targetPartitionSize=-1.|null|no|
|
||||
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|
||||
|buildV9Directly|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|false|no|
|
||||
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|
||||
|
||||
#### IndexSpec
|
||||
|
||||
|
|
|
@ -215,7 +215,8 @@ public class OrcIndexGeneratorJobTest
|
|||
true,
|
||||
null,
|
||||
true,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
|
@ -336,6 +336,11 @@ public class HadoopDruidIndexerConfig
|
|||
return schema.getTuningConfig().getPartitionsSpec().getTargetPartitionSize();
|
||||
}
|
||||
|
||||
public boolean isForceExtendableShardSpecs()
|
||||
{
|
||||
return schema.getTuningConfig().isForceExtendableShardSpecs();
|
||||
}
|
||||
|
||||
public long getMaxPartitionSize()
|
||||
{
|
||||
return schema.getTuningConfig().getPartitionsSpec().getMaxPartitionSize();
|
||||
|
@ -361,6 +366,11 @@ public class HadoopDruidIndexerConfig
|
|||
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
|
||||
}
|
||||
|
||||
public int getShardSpecCount(Bucket bucket)
|
||||
{
|
||||
return schema.getTuningConfig().getShardSpecs().get(bucket.time).size();
|
||||
}
|
||||
|
||||
public boolean isBuildV9Directly()
|
||||
{
|
||||
return schema.getTuningConfig().getBuildV9Directly();
|
||||
|
|
|
@ -64,7 +64,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
false,
|
||||
null,
|
||||
DEFAULT_BUILD_V9_DIRECTLY,
|
||||
DEFAULT_NUM_BACKGROUND_PERSIST_THREADS
|
||||
DEFAULT_NUM_BACKGROUND_PERSIST_THREADS,
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -83,6 +84,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
private final boolean useCombiner;
|
||||
private final Boolean buildV9Directly;
|
||||
private final int numBackgroundPersistThreads;
|
||||
private final boolean forceExtendableShardSpecs;
|
||||
|
||||
@JsonCreator
|
||||
public HadoopTuningConfig(
|
||||
|
@ -102,7 +104,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
// See https://github.com/druid-io/druid/pull/1922
|
||||
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
|
||||
final @JsonProperty("buildV9Directly") Boolean buildV9Directly,
|
||||
final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads
|
||||
final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads,
|
||||
final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs
|
||||
)
|
||||
{
|
||||
this.workingPath = workingPath;
|
||||
|
@ -126,6 +129,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
this.numBackgroundPersistThreads = numBackgroundPersistThreads == null
|
||||
? DEFAULT_NUM_BACKGROUND_PERSIST_THREADS
|
||||
: numBackgroundPersistThreads;
|
||||
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
|
||||
Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0");
|
||||
}
|
||||
|
||||
|
@ -219,6 +223,12 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
return numBackgroundPersistThreads;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isForceExtendableShardSpecs()
|
||||
{
|
||||
return forceExtendableShardSpecs;
|
||||
}
|
||||
|
||||
public HadoopTuningConfig withWorkingPath(String path)
|
||||
{
|
||||
return new HadoopTuningConfig(
|
||||
|
@ -237,7 +247,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
useCombiner,
|
||||
null,
|
||||
buildV9Directly,
|
||||
numBackgroundPersistThreads
|
||||
numBackgroundPersistThreads,
|
||||
forceExtendableShardSpecs
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -259,7 +270,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
useCombiner,
|
||||
null,
|
||||
buildV9Directly,
|
||||
numBackgroundPersistThreads
|
||||
numBackgroundPersistThreads,
|
||||
forceExtendableShardSpecs
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -281,7 +293,8 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
useCombiner,
|
||||
null,
|
||||
buildV9Directly,
|
||||
numBackgroundPersistThreads
|
||||
numBackgroundPersistThreads,
|
||||
forceExtendableShardSpecs
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,12 +46,13 @@ import io.druid.query.aggregation.AggregatorFactory;
|
|||
import io.druid.segment.BaseProgressIndicator;
|
||||
import io.druid.segment.ProgressIndicator;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.NumberedShardSpec;
|
||||
import io.druid.timeline.partition.ShardSpec;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -700,6 +701,18 @@ public class IndexGeneratorJob implements Jobby
|
|||
}
|
||||
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
|
||||
.getFileSystem(context.getConfiguration());
|
||||
|
||||
// ShardSpec used for partitioning within this Hadoop job.
|
||||
final ShardSpec shardSpecForPartitioning = config.getShardSpec(bucket).getActualSpec();
|
||||
|
||||
// ShardSpec to be published.
|
||||
final ShardSpec shardSpecForPublishing;
|
||||
if (config.isForceExtendableShardSpecs()) {
|
||||
shardSpecForPublishing = new NumberedShardSpec(shardSpecForPartitioning.getPartitionNum(),config.getShardSpecCount(bucket));
|
||||
} else {
|
||||
shardSpecForPublishing = shardSpecForPartitioning;
|
||||
}
|
||||
|
||||
final DataSegment segmentTemplate = new DataSegment(
|
||||
config.getDataSource(),
|
||||
interval,
|
||||
|
@ -707,7 +720,7 @@ public class IndexGeneratorJob implements Jobby
|
|||
null,
|
||||
ImmutableList.copyOf(allDimensionNames),
|
||||
metricNames,
|
||||
config.getShardSpec(bucket).getActualSpec(),
|
||||
shardSpecForPublishing,
|
||||
-1,
|
||||
-1
|
||||
);
|
||||
|
|
|
@ -384,7 +384,8 @@ public class BatchDeltaIngestionTest
|
|||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
|
@ -163,7 +163,8 @@ public class DetermineHashedPartitionsJobTest
|
|||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
);
|
||||
this.indexerConfig = new HadoopDruidIndexerConfig(ingestionSpec);
|
||||
|
|
|
@ -267,7 +267,8 @@ public class DeterminePartitionsJobTest
|
|||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
|
@ -226,7 +226,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
);
|
||||
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);
|
||||
|
@ -296,7 +297,8 @@ public class HadoopDruidIndexerConfigTest
|
|||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
);
|
||||
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);
|
||||
|
|
|
@ -55,7 +55,8 @@ public class HadoopTuningConfigTest
|
|||
true,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
true
|
||||
);
|
||||
|
||||
HadoopTuningConfig actual = jsonReadWriteRead(jsonMapper.writeValueAsString(expected), HadoopTuningConfig.class);
|
||||
|
@ -74,6 +75,7 @@ public class HadoopTuningConfigTest
|
|||
Assert.assertEquals(true, actual.isCombineText());
|
||||
Assert.assertEquals(true, actual.getUseCombiner());
|
||||
Assert.assertEquals(0, actual.getNumBackgroundPersistThreads());
|
||||
Assert.assertEquals(true, actual.isForceExtendableShardSpecs());
|
||||
}
|
||||
|
||||
public static <T> T jsonReadWriteRead(String s, Class<T> klass)
|
||||
|
|
|
@ -42,6 +42,7 @@ import io.druid.segment.indexing.DataSchema;
|
|||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.NumberedShardSpec;
|
||||
import io.druid.timeline.partition.ShardSpec;
|
||||
import io.druid.timeline.partition.SingleDimensionShardSpec;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -89,7 +90,7 @@ public class IndexGeneratorJobTest
|
|||
|
||||
@Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " +
|
||||
"data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " +
|
||||
"aggs={8}, datasourceName={9}, buildV9Directly={10}")
|
||||
"aggs={8}, datasourceName={9}, forceExtendableShardSpecs={10}, buildV9Directly={11}")
|
||||
public static Collection<Object[]> constructFeed()
|
||||
{
|
||||
final List<Object[]> baseConstructors = Arrays.asList(
|
||||
|
@ -363,17 +364,18 @@ public class IndexGeneratorJobTest
|
|||
}
|
||||
);
|
||||
|
||||
// Run each baseConstructor with/without buildV9Directly.
|
||||
// Run each baseConstructor with/without buildV9Directly and forceExtendableShardSpecs.
|
||||
final List<Object[]> constructors = Lists.newArrayList();
|
||||
for (Object[] baseConstructor : baseConstructors) {
|
||||
final Object[] c1 = new Object[baseConstructor.length + 1];
|
||||
final Object[] c2 = new Object[baseConstructor.length + 1];
|
||||
System.arraycopy(baseConstructor, 0, c1, 0, baseConstructor.length);
|
||||
System.arraycopy(baseConstructor, 0, c2, 0, baseConstructor.length);
|
||||
c1[c1.length - 1] = true;
|
||||
c2[c2.length - 1] = false;
|
||||
constructors.add(c1);
|
||||
constructors.add(c2);
|
||||
for (int buildV9Directly = 0; buildV9Directly < 2; buildV9Directly++) {
|
||||
for (int forceExtendableShardSpecs = 0; forceExtendableShardSpecs < 2 ; forceExtendableShardSpecs++) {
|
||||
final Object[] fullConstructor = new Object[baseConstructor.length + 2];
|
||||
System.arraycopy(baseConstructor, 0, fullConstructor, 0, baseConstructor.length);
|
||||
fullConstructor[baseConstructor.length] = forceExtendableShardSpecs == 0;
|
||||
fullConstructor[baseConstructor.length + 1] = buildV9Directly == 0;
|
||||
constructors.add(fullConstructor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return constructors;
|
||||
|
@ -392,6 +394,7 @@ public class IndexGeneratorJobTest
|
|||
private final Integer maxRowsInMemory;
|
||||
private final AggregatorFactory[] aggs;
|
||||
private final String datasourceName;
|
||||
private final boolean forceExtendableShardSpecs;
|
||||
private final boolean buildV9Directly;
|
||||
|
||||
private ObjectMapper mapper;
|
||||
|
@ -410,8 +413,9 @@ public class IndexGeneratorJobTest
|
|||
Integer maxRowsInMemory,
|
||||
AggregatorFactory[] aggs,
|
||||
String datasourceName,
|
||||
boolean forceExtendableShardSpecs,
|
||||
boolean buildV9Directly
|
||||
) throws IOException
|
||||
) throws IOException
|
||||
{
|
||||
this.useCombiner = useCombiner;
|
||||
this.partitionType = partitionType;
|
||||
|
@ -423,6 +427,7 @@ public class IndexGeneratorJobTest
|
|||
this.maxRowsInMemory = maxRowsInMemory;
|
||||
this.aggs = aggs;
|
||||
this.datasourceName = datasourceName;
|
||||
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
|
||||
this.buildV9Directly = buildV9Directly;
|
||||
}
|
||||
|
||||
|
@ -511,7 +516,8 @@ public class IndexGeneratorJobTest
|
|||
useCombiner,
|
||||
null,
|
||||
buildV9Directly,
|
||||
null
|
||||
null,
|
||||
forceExtendableShardSpecs
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -619,7 +625,11 @@ public class IndexGeneratorJobTest
|
|||
Assert.fail("Test did not specify supported datasource name");
|
||||
}
|
||||
|
||||
if (partitionType.equals("hashed")) {
|
||||
if (forceExtendableShardSpecs) {
|
||||
NumberedShardSpec spec = (NumberedShardSpec) dataSegment.getShardSpec();
|
||||
Assert.assertEquals(partitionNum, spec.getPartitionNum());
|
||||
Assert.assertEquals(shardInfo.length, spec.getPartitions());
|
||||
} else if (partitionType.equals("hashed")) {
|
||||
Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum];
|
||||
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
|
||||
Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum());
|
||||
|
|
|
@ -118,7 +118,8 @@ public class JobHelperTest
|
|||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
|
@ -120,7 +120,24 @@ public class GranularityPathSpecTest
|
|||
jsonMapper
|
||||
),
|
||||
new HadoopIOConfig(null, null, null),
|
||||
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null
|
||||
new HadoopTuningConfig(
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
false
|
||||
)
|
||||
);
|
||||
|
||||
|
|
|
@ -206,7 +206,8 @@ public class HadoopConverterJobTest
|
|||
false,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
|
@ -62,6 +62,7 @@ import io.druid.segment.realtime.plumber.Plumber;
|
|||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.NoneShardSpec;
|
||||
import io.druid.timeline.partition.NumberedShardSpec;
|
||||
import io.druid.timeline.partition.ShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -218,10 +219,22 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
}
|
||||
}
|
||||
for (final ShardSpec shardSpec : shardSpecs) {
|
||||
// ShardSpec to be published.
|
||||
final ShardSpec shardSpecForPublishing;
|
||||
if (ingestionSchema.getTuningConfig().isForceExtendableShardSpecs()) {
|
||||
shardSpecForPublishing = new NumberedShardSpec(
|
||||
shardSpec.getPartitionNum(),
|
||||
shardSpecs.size()
|
||||
);
|
||||
} else {
|
||||
shardSpecForPublishing = shardSpec;
|
||||
}
|
||||
|
||||
final DataSegment segment = generateSegment(
|
||||
toolbox,
|
||||
ingestionSchema.getDataSchema(),
|
||||
shardSpec,
|
||||
shardSpecForPublishing,
|
||||
bucket,
|
||||
myLock.getVersion()
|
||||
);
|
||||
|
@ -314,7 +327,8 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
private DataSegment generateSegment(
|
||||
final TaskToolbox toolbox,
|
||||
final DataSchema schema,
|
||||
final ShardSpec shardSpec,
|
||||
final ShardSpec shardSpecForPartitioning,
|
||||
final ShardSpec shardSpecForPublishing,
|
||||
final Interval interval,
|
||||
final String version
|
||||
) throws IOException
|
||||
|
@ -328,7 +342,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
interval.getStart(),
|
||||
interval.getEnd(),
|
||||
version,
|
||||
shardSpec.getPartitionNum()
|
||||
shardSpecForPartitioning.getPartitionNum()
|
||||
)
|
||||
);
|
||||
|
||||
|
@ -381,7 +395,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
).findPlumber(
|
||||
schema,
|
||||
convertTuningConfig(
|
||||
shardSpec,
|
||||
shardSpecForPublishing,
|
||||
myRowFlushBoundary,
|
||||
ingestionSchema.getTuningConfig().getIndexSpec(),
|
||||
ingestionSchema.tuningConfig.getBuildV9Directly()
|
||||
|
@ -396,7 +410,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
while (firehose.hasMore()) {
|
||||
final InputRow inputRow = firehose.nextRow();
|
||||
|
||||
if (shouldIndex(shardSpec, interval, inputRow, rollupGran)) {
|
||||
if (shouldIndex(shardSpecForPartitioning, interval, inputRow, rollupGran)) {
|
||||
int numRows = plumber.add(inputRow, committerSupplier);
|
||||
if (numRows == -1) {
|
||||
throw new ISE(
|
||||
|
@ -427,7 +441,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
+ " and output %,d rows",
|
||||
getId(),
|
||||
interval,
|
||||
shardSpec.getPartitionNum(),
|
||||
shardSpecForPartitioning.getPartitionNum(),
|
||||
metrics.processed() + metrics.unparseable() + metrics.thrownAway(),
|
||||
metrics.processed(),
|
||||
metrics.unparseable(),
|
||||
|
@ -457,7 +471,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
|
||||
this.dataSchema = dataSchema;
|
||||
this.ioConfig = ioConfig;
|
||||
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null, null, null) : tuningConfig;
|
||||
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null, null, null, false) : tuningConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -515,6 +529,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
private final int numShards;
|
||||
private final IndexSpec indexSpec;
|
||||
private final Boolean buildV9Directly;
|
||||
private final boolean forceExtendableShardSpecs;
|
||||
|
||||
@JsonCreator
|
||||
public IndexTuningConfig(
|
||||
|
@ -522,7 +537,8 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
@JsonProperty("rowFlushBoundary") int rowFlushBoundary,
|
||||
@JsonProperty("numShards") @Nullable Integer numShards,
|
||||
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
|
||||
@JsonProperty("buildV9Directly") Boolean buildV9Directly
|
||||
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
|
||||
@JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs
|
||||
)
|
||||
{
|
||||
this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
|
||||
|
@ -535,6 +551,7 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
"targetPartitionsSize and shardCount both cannot be set"
|
||||
);
|
||||
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
|
||||
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -566,5 +583,11 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
{
|
||||
return buildV9Directly;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isForceExtendableShardSpecs()
|
||||
{
|
||||
return forceExtendableShardSpecs;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
|||
import io.druid.segment.loading.DataSegmentPusher;
|
||||
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
|
||||
import io.druid.timeline.DataSegment;
|
||||
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
|
||||
import io.druid.timeline.partition.NumberedShardSpec;
|
||||
import io.druid.timeline.partition.ShardSpec;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -60,6 +61,8 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -144,7 +147,8 @@ public class IndexTaskTest
|
|||
0,
|
||||
null,
|
||||
indexSpec,
|
||||
null
|
||||
null,
|
||||
false
|
||||
)
|
||||
),
|
||||
jsonMapper,
|
||||
|
@ -154,6 +158,104 @@ public class IndexTaskTest
|
|||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
||||
Assert.assertEquals(2, segments.size());
|
||||
|
||||
Assert.assertEquals("test", segments.get(0).getDataSource());
|
||||
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
|
||||
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class));
|
||||
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
|
||||
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getPartitions());
|
||||
|
||||
Assert.assertEquals("test", segments.get(1).getDataSource());
|
||||
Assert.assertEquals(new Interval("2014/P1D"), segments.get(1).getInterval());
|
||||
Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class));
|
||||
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
|
||||
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getPartitions());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForceExtendableShardSpecs() throws Exception
|
||||
{
|
||||
File tmpDir = temporaryFolder.newFolder();
|
||||
|
||||
File tmpFile = File.createTempFile("druid", "index", tmpDir);
|
||||
|
||||
PrintWriter writer = new PrintWriter(tmpFile);
|
||||
writer.println("2014-01-01T00:00:10Z,a,1");
|
||||
writer.println("2014-01-01T01:00:20Z,b,1");
|
||||
writer.println("2014-01-01T02:00:30Z,c,1");
|
||||
writer.close();
|
||||
|
||||
IndexTask indexTask = new IndexTask(
|
||||
null,
|
||||
null,
|
||||
new IndexTask.IndexIngestionSpec(
|
||||
new DataSchema(
|
||||
"test",
|
||||
jsonMapper.convertValue(
|
||||
new StringInputRowParser(
|
||||
new CSVParseSpec(
|
||||
new TimestampSpec(
|
||||
"ts",
|
||||
"auto",
|
||||
null
|
||||
),
|
||||
new DimensionsSpec(
|
||||
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")),
|
||||
Lists.<String>newArrayList(),
|
||||
Lists.<SpatialDimensionSchema>newArrayList()
|
||||
),
|
||||
null,
|
||||
Arrays.asList("ts", "dim", "val")
|
||||
),
|
||||
null
|
||||
),
|
||||
Map.class
|
||||
),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("val", "val")
|
||||
},
|
||||
new UniformGranularitySpec(
|
||||
Granularity.DAY,
|
||||
QueryGranularities.MINUTE,
|
||||
Arrays.asList(new Interval("2014/2015"))
|
||||
),
|
||||
jsonMapper
|
||||
),
|
||||
new IndexTask.IndexIOConfig(
|
||||
new LocalFirehoseFactory(
|
||||
tmpDir,
|
||||
"druid*",
|
||||
null
|
||||
)
|
||||
),
|
||||
new IndexTask.IndexTuningConfig(
|
||||
2,
|
||||
0,
|
||||
null,
|
||||
indexSpec,
|
||||
null,
|
||||
true
|
||||
)
|
||||
),
|
||||
jsonMapper,
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
||||
Assert.assertEquals(2, segments.size());
|
||||
|
||||
Assert.assertEquals("test", segments.get(0).getDataSource());
|
||||
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
|
||||
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class));
|
||||
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
|
||||
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getPartitions());
|
||||
|
||||
Assert.assertEquals("test", segments.get(1).getDataSource());
|
||||
Assert.assertEquals(new Interval("2014/P1D"), segments.get(1).getInterval());
|
||||
Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class));
|
||||
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
|
||||
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getPartitions());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -268,6 +370,8 @@ public class IndexTaskTest
|
|||
)
|
||||
);
|
||||
|
||||
Collections.sort(segments);
|
||||
|
||||
return segments;
|
||||
}
|
||||
|
||||
|
@ -346,7 +450,8 @@ public class IndexTaskTest
|
|||
1000,
|
||||
null,
|
||||
new IndexSpec(),
|
||||
null
|
||||
null,
|
||||
false
|
||||
);
|
||||
RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(
|
||||
spec,
|
||||
|
|
|
@ -93,7 +93,7 @@ public class TaskSerdeTest
|
|||
jsonMapper
|
||||
),
|
||||
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
|
||||
),
|
||||
jsonMapper,
|
||||
null
|
||||
|
@ -134,7 +134,7 @@ public class TaskSerdeTest
|
|||
jsonMapper
|
||||
),
|
||||
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
|
||||
),
|
||||
jsonMapper,
|
||||
null
|
||||
|
|
|
@ -651,7 +651,7 @@ public class TaskLifecycleTest
|
|||
mapper
|
||||
),
|
||||
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
|
||||
),
|
||||
mapper,
|
||||
null
|
||||
|
@ -709,7 +709,7 @@ public class TaskLifecycleTest
|
|||
mapper
|
||||
),
|
||||
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
|
||||
),
|
||||
mapper,
|
||||
null
|
||||
|
@ -1068,7 +1068,7 @@ public class TaskLifecycleTest
|
|||
mapper
|
||||
),
|
||||
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null)
|
||||
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
|
||||
),
|
||||
mapper,
|
||||
null
|
||||
|
|
Loading…
Reference in New Issue