mirror of https://github.com/apache/druid.git
IndexTask: Use shared groupId when "appendToExisting" is on. (#4582)
This allows the tasks to run concurrently. Additionally, rework the partition-determining code in a couple ways: - Use a task-id based sequenceName so concurrently running append tasks do not clobber each others' segments. - Make the list of shardSpecs empty when rollup is non-guaranteed, and let allocators handle the creation of incremental shardSpecs.
This commit is contained in:
parent
4bd0f174ba
commit
a9c875d746
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.indexing.common.task;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
@ -43,7 +42,6 @@ import io.druid.data.input.Firehose;
|
|||
import io.druid.data.input.FirehoseFactory;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.Rows;
|
||||
import io.druid.guice.annotations.Smile;
|
||||
import io.druid.hll.HyperLogLogCollector;
|
||||
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
|
||||
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
|
||||
|
@ -96,15 +94,16 @@ import org.joda.time.Period;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
@ -113,12 +112,24 @@ public class IndexTask extends AbstractTask
|
|||
{
|
||||
private static final Logger log = new Logger(IndexTask.class);
|
||||
private static final HashFunction hashFunction = Hashing.murmur3_128();
|
||||
private static final String TYPE = "index";
|
||||
|
||||
private static String makeId(String id, IndexIngestionSpec ingestionSchema)
|
||||
{
|
||||
return id != null ? id : StringUtils.format("index_%s_%s", makeDataSource(ingestionSchema), new DateTime());
|
||||
}
|
||||
|
||||
private static String makeGroupId(IndexIngestionSpec ingestionSchema)
|
||||
{
|
||||
if (ingestionSchema.getIOConfig().isAppendToExisting()) {
|
||||
// Shared locking group for all tasks that append, since they are OK to run concurrently.
|
||||
return StringUtils.format("%s_append_%s", TYPE, ingestionSchema.getDataSchema().getDataSource());
|
||||
} else {
|
||||
// Return null, one locking group per task.
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static String makeDataSource(IndexIngestionSpec ingestionSchema)
|
||||
{
|
||||
return ingestionSchema.getDataSchema().getDataSource();
|
||||
|
@ -126,27 +137,30 @@ public class IndexTask extends AbstractTask
|
|||
|
||||
@JsonIgnore
|
||||
private final IndexIngestionSpec ingestionSchema;
|
||||
private final ObjectMapper smileMapper;
|
||||
|
||||
@JsonCreator
|
||||
public IndexTask(
|
||||
@JsonProperty("id") final String id,
|
||||
@JsonProperty("resource") final TaskResource taskResource,
|
||||
@JsonProperty("spec") final IndexIngestionSpec ingestionSchema,
|
||||
@JsonProperty("context") final Map<String, Object> context,
|
||||
@Smile @JacksonInject final ObjectMapper smileMapper
|
||||
@JsonProperty("context") final Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(makeId(id, ingestionSchema), null, taskResource, makeDataSource(ingestionSchema), context);
|
||||
super(
|
||||
makeId(id, ingestionSchema),
|
||||
makeGroupId(ingestionSchema),
|
||||
taskResource,
|
||||
makeDataSource(ingestionSchema),
|
||||
context
|
||||
);
|
||||
|
||||
this.ingestionSchema = ingestionSchema;
|
||||
this.smileMapper = smileMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType()
|
||||
{
|
||||
return "index";
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -226,6 +240,11 @@ public class IndexTask extends AbstractTask
|
|||
return tuningConfig.isForceGuaranteedRollup();
|
||||
}
|
||||
|
||||
private static boolean isExtendableShardSpecs(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
|
||||
{
|
||||
return !isGuaranteedRollup(ioConfig, tuningConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines intervals and shardSpecs for input data. This method first checks that it must determine intervals and
|
||||
* shardSpecs by itself. Intervals must be determined if they are not specified in {@link GranularitySpec}.
|
||||
|
@ -253,16 +272,22 @@ public class IndexTask extends AbstractTask
|
|||
|
||||
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
|
||||
|
||||
// Must determine intervals if unknown, since we acquire all locks before processing any data.
|
||||
final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent();
|
||||
// Guaranteed rollup means that this index task guarantees the 'perfect rollup' across the entire data set.
|
||||
final boolean guaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig);
|
||||
final boolean determineNumPartitions = tuningConfig.getNumShards() == null && guaranteedRollup;
|
||||
final boolean useExtendableShardSpec = !guaranteedRollup;
|
||||
|
||||
// Must determine partitions if rollup is guaranteed and the user didn't provide a specific value.
|
||||
final boolean determineNumPartitions = tuningConfig.getNumShards() == null
|
||||
&& isGuaranteedRollup(ioConfig, tuningConfig);
|
||||
|
||||
// if we were given number of shards per interval and the intervals, we don't need to scan the data
|
||||
if (!determineNumPartitions && !determineIntervals) {
|
||||
log.info("Skipping determine partition scan");
|
||||
return createShardSpecWithoutInputScan(jsonMapper, granularitySpec, tuningConfig, useExtendableShardSpec);
|
||||
return createShardSpecWithoutInputScan(
|
||||
jsonMapper,
|
||||
granularitySpec,
|
||||
ioConfig,
|
||||
tuningConfig
|
||||
);
|
||||
} else {
|
||||
// determine intervals containing data and prime HLL collectors
|
||||
return createShardSpecsFromInput(
|
||||
|
@ -273,8 +298,7 @@ public class IndexTask extends AbstractTask
|
|||
granularitySpec,
|
||||
tuningConfig,
|
||||
determineIntervals,
|
||||
determineNumPartitions,
|
||||
useExtendableShardSpec
|
||||
determineNumPartitions
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -282,32 +306,36 @@ public class IndexTask extends AbstractTask
|
|||
private static ShardSpecs createShardSpecWithoutInputScan(
|
||||
ObjectMapper jsonMapper,
|
||||
GranularitySpec granularitySpec,
|
||||
IndexTuningConfig tuningConfig,
|
||||
boolean useExtendableShardSpec
|
||||
IndexIOConfig ioConfig,
|
||||
IndexTuningConfig tuningConfig
|
||||
)
|
||||
{
|
||||
final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
|
||||
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
|
||||
useExtendableShardSpec,
|
||||
numShards,
|
||||
jsonMapper
|
||||
);
|
||||
final Map<Interval, List<ShardSpec>> shardSpecs = new HashMap<>();
|
||||
final SortedSet<Interval> intervals = granularitySpec.bucketIntervals().get();
|
||||
|
||||
final Map<Interval, List<ShardSpec>> intervalToShardSpecs = new HashMap<>();
|
||||
for (Interval interval : granularitySpec.bucketIntervals().get()) {
|
||||
final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
|
||||
.mapToObj(
|
||||
shardId -> shardSpecCreateFn.apply(shardId, numShards)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
intervalToShardSpecs.put(interval, intervalShardSpecs);
|
||||
}
|
||||
if (isGuaranteedRollup(ioConfig, tuningConfig)) {
|
||||
// Overwrite mode, guaranteed rollup: shardSpecs must be known in advance.
|
||||
final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
|
||||
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
|
||||
numShards,
|
||||
jsonMapper
|
||||
);
|
||||
|
||||
if (useExtendableShardSpec) {
|
||||
return createExtendableShardSpecs(intervalToShardSpecs);
|
||||
for (Interval interval : intervals) {
|
||||
final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
|
||||
.mapToObj(
|
||||
shardId -> shardSpecCreateFn.apply(shardId, numShards)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
shardSpecs.put(interval, intervalShardSpecs);
|
||||
}
|
||||
} else {
|
||||
return createNonExtendableShardSpecs(intervalToShardSpecs);
|
||||
for (Interval interval : intervals) {
|
||||
shardSpecs.put(interval, ImmutableList.of());
|
||||
}
|
||||
}
|
||||
|
||||
return new ShardSpecs(shardSpecs);
|
||||
}
|
||||
|
||||
private static ShardSpecs createShardSpecsFromInput(
|
||||
|
@ -318,8 +346,7 @@ public class IndexTask extends AbstractTask
|
|||
GranularitySpec granularitySpec,
|
||||
IndexTuningConfig tuningConfig,
|
||||
boolean determineIntervals,
|
||||
boolean determineNumPartitions,
|
||||
boolean useExtendableShardSpec
|
||||
boolean determineNumPartitions
|
||||
) throws IOException
|
||||
{
|
||||
log.info("Determining intervals and shardSpecs");
|
||||
|
@ -339,11 +366,11 @@ public class IndexTask extends AbstractTask
|
|||
final int defaultNumShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
|
||||
for (final Map.Entry<Interval, Optional<HyperLogLogCollector>> entry : hllCollectors.entrySet()) {
|
||||
final Interval interval = entry.getKey();
|
||||
final Optional<HyperLogLogCollector> collector = entry.getValue();
|
||||
final HyperLogLogCollector collector = entry.getValue().orNull();
|
||||
|
||||
final int numShards;
|
||||
if (determineNumPartitions) {
|
||||
final long numRows = new Double(collector.get().estimateCardinality()).longValue();
|
||||
final long numRows = new Double(collector.estimateCardinality()).longValue();
|
||||
numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize());
|
||||
log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
|
||||
} else {
|
||||
|
@ -351,26 +378,26 @@ public class IndexTask extends AbstractTask
|
|||
log.info("Creating [%,d] shards for interval [%s]", numShards, interval);
|
||||
}
|
||||
|
||||
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
|
||||
useExtendableShardSpec,
|
||||
numShards,
|
||||
jsonMapper
|
||||
);
|
||||
if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
|
||||
// Overwrite mode, guaranteed rollup: shardSpecs must be known in advance.
|
||||
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
|
||||
numShards,
|
||||
jsonMapper
|
||||
);
|
||||
|
||||
final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
|
||||
.mapToObj(
|
||||
shardId -> shardSpecCreateFn.apply(shardId, numShards)
|
||||
).collect(Collectors.toList());
|
||||
final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
|
||||
.mapToObj(
|
||||
shardId -> shardSpecCreateFn.apply(shardId, numShards)
|
||||
).collect(Collectors.toList());
|
||||
|
||||
intervalToShardSpecs.put(interval, intervalShardSpecs);
|
||||
intervalToShardSpecs.put(interval, intervalShardSpecs);
|
||||
} else {
|
||||
intervalToShardSpecs.put(interval, ImmutableList.of());
|
||||
}
|
||||
}
|
||||
log.info("Found intervals and shardSpecs in %,dms", System.currentTimeMillis() - determineShardSpecsStartMillis);
|
||||
|
||||
if (useExtendableShardSpec) {
|
||||
return createExtendableShardSpecs(intervalToShardSpecs);
|
||||
} else {
|
||||
return createNonExtendableShardSpecs(intervalToShardSpecs);
|
||||
}
|
||||
return new ShardSpecs(intervalToShardSpecs);
|
||||
}
|
||||
|
||||
private static Map<Interval, Optional<HyperLogLogCollector>> collectIntervalsAndShardSpecs(
|
||||
|
@ -453,93 +480,17 @@ public class IndexTask extends AbstractTask
|
|||
return hllCollectors;
|
||||
}
|
||||
|
||||
private static ShardSpecs createNonExtendableShardSpecs(Map<Interval, List<ShardSpec>> intervalToShardSpecs)
|
||||
{
|
||||
return new ShardSpecs()
|
||||
{
|
||||
@Override
|
||||
public Collection<Interval> getIntervals()
|
||||
{
|
||||
return intervalToShardSpecs.keySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardSpec getShardSpec(Interval interval, InputRow row)
|
||||
{
|
||||
final List<ShardSpec> shardSpecs = intervalToShardSpecs.get(interval);
|
||||
if (shardSpecs == null || shardSpecs.isEmpty()) {
|
||||
throw new ISE("Failed to get shardSpec for interval[%s]", interval);
|
||||
}
|
||||
return shardSpecs.get(0).getLookup(shardSpecs).getShardSpec(row.getTimestampFromEpoch(), row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateShardSpec(Interval interval)
|
||||
{
|
||||
// do nothing
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static ShardSpecs createExtendableShardSpecs(Map<Interval, List<ShardSpec>> intervalToShardSpec)
|
||||
{
|
||||
final Map<Interval, ShardSpec> shardSpecMap = new HashMap<>(intervalToShardSpec.size());
|
||||
|
||||
intervalToShardSpec.forEach((interval, shardSpecs) -> {
|
||||
Preconditions.checkState(shardSpecs.size() == 1);
|
||||
shardSpecMap.put(interval, shardSpecs.get(0));
|
||||
});
|
||||
|
||||
return new ShardSpecs()
|
||||
{
|
||||
@Override
|
||||
public Collection<Interval> getIntervals()
|
||||
{
|
||||
return shardSpecMap.keySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShardSpec getShardSpec(Interval interval, InputRow row)
|
||||
{
|
||||
return shardSpecMap.get(interval);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateShardSpec(Interval interval)
|
||||
{
|
||||
final ShardSpec shardSpec = shardSpecMap.get(interval);
|
||||
Preconditions.checkState(
|
||||
shardSpec instanceof NumberedShardSpec,
|
||||
"shardSpec[%s] must be NumberedShardSpec",
|
||||
shardSpec.getClass().getCanonicalName()
|
||||
);
|
||||
final NumberedShardSpec previous = (NumberedShardSpec) shardSpec;
|
||||
Preconditions.checkNotNull(previous, "previous shardSpec for interval[%s] is null", interval);
|
||||
shardSpecMap.put(interval, new NumberedShardSpec(previous.getPartitionNum() + 1, previous.getPartitions()));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static BiFunction<Integer, Integer, ShardSpec> getShardSpecCreateFunction(
|
||||
boolean useExtendableShardSpec,
|
||||
Integer numShards,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
if (useExtendableShardSpec) {
|
||||
// 0 partitions means there's no core partitions. See NumberedPartitionChunk.isStart() and
|
||||
// NumberedPartitionChunk.isEnd().
|
||||
return (shardId, notUsed) -> new NumberedShardSpec(shardId, 0);
|
||||
} else {
|
||||
if (numShards == null) {
|
||||
throw new ISE("numShards must not be null");
|
||||
}
|
||||
Preconditions.checkNotNull(numShards, "numShards");
|
||||
|
||||
if (numShards == 1) {
|
||||
return (shardId, totalNumShards) -> NoneShardSpec.instance();
|
||||
} else {
|
||||
return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(shardId, totalNumShards, null, jsonMapper);
|
||||
}
|
||||
if (numShards == 1) {
|
||||
return (shardId, totalNumShards) -> NoneShardSpec.instance();
|
||||
} else {
|
||||
return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(shardId, totalNumShards, null, jsonMapper);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -549,12 +500,12 @@ public class IndexTask extends AbstractTask
|
|||
* if one of below conditions are satisfied.
|
||||
*
|
||||
* <ul>
|
||||
* <li>
|
||||
* If the number of rows in a segment exceeds {@link IndexTuningConfig#targetPartitionSize}
|
||||
* </li>
|
||||
* <li>
|
||||
* If the number of rows added to {@link AppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows}
|
||||
* </li>
|
||||
* <li>
|
||||
* If the number of rows in a segment exceeds {@link IndexTuningConfig#targetPartitionSize}
|
||||
* </li>
|
||||
* <li>
|
||||
* If the number of rows added to {@link AppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows}
|
||||
* </li>
|
||||
* </ul>
|
||||
*
|
||||
* At the end of this method, all the remaining segments are published.
|
||||
|
@ -591,14 +542,43 @@ public class IndexTask extends AbstractTask
|
|||
final long publishTimeout = tuningConfig.getPublishTimeout();
|
||||
final long maxRowsInAppenderator = tuningConfig.getMaxTotalRows();
|
||||
final int maxRowsInSegment = tuningConfig.getTargetPartitionSize() == null
|
||||
? Integer.MAX_VALUE
|
||||
: tuningConfig.getTargetPartitionSize();
|
||||
? Integer.MAX_VALUE
|
||||
: tuningConfig.getTargetPartitionSize();
|
||||
final boolean isGuaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig);
|
||||
|
||||
final SegmentAllocator segmentAllocator;
|
||||
if (ioConfig.isAppendToExisting()) {
|
||||
if (isGuaranteedRollup) {
|
||||
// Overwrite mode, guaranteed rollup: segments are all known in advance and there is one per sequenceName.
|
||||
final Map<String, SegmentIdentifier> lookup = new HashMap<>();
|
||||
|
||||
for (Map.Entry<Interval, List<ShardSpec>> entry : shardSpecs.getMap().entrySet()) {
|
||||
for (ShardSpec shardSpec : entry.getValue()) {
|
||||
final ShardSpec shardSpecForPublishing;
|
||||
|
||||
if (isExtendableShardSpecs(ioConfig, tuningConfig)) {
|
||||
shardSpecForPublishing = new NumberedShardSpec(
|
||||
shardSpec.getPartitionNum(),
|
||||
entry.getValue().size()
|
||||
);
|
||||
} else {
|
||||
shardSpecForPublishing = shardSpec;
|
||||
}
|
||||
|
||||
lookup.put(
|
||||
Appenderators.getSequenceName(entry.getKey(), version, shardSpec),
|
||||
new SegmentIdentifier(getDataSource(), entry.getKey(), version, shardSpecForPublishing)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
segmentAllocator = (row, sequenceName, previousSegmentId) -> lookup.get(sequenceName);
|
||||
} else if (ioConfig.isAppendToExisting()) {
|
||||
// Append mode: Allocate segments as needed using Overlord APIs.
|
||||
segmentAllocator = new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema);
|
||||
} else {
|
||||
// Overwrite mode, non-guaranteed rollup: We can make up our own segment ids but we don't know them in advance.
|
||||
final Map<Interval, AtomicInteger> counters = new HashMap<>();
|
||||
|
||||
segmentAllocator = (row, sequenceName, previousSegmentId) -> {
|
||||
final DateTime timestamp = row.getTimestamp();
|
||||
Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp);
|
||||
|
@ -607,12 +587,12 @@ public class IndexTask extends AbstractTask
|
|||
}
|
||||
|
||||
final Interval interval = maybeInterval.get();
|
||||
final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, row);
|
||||
if (shardSpec == null) {
|
||||
if (!shardSpecs.getMap().containsKey(interval)) {
|
||||
throw new ISE("Could not find shardSpec for interval[%s]", interval);
|
||||
}
|
||||
|
||||
return new SegmentIdentifier(getDataSource(), interval, version, shardSpec);
|
||||
final int partitionNum = counters.computeIfAbsent(interval, x -> new AtomicInteger()).getAndIncrement();
|
||||
return new SegmentIdentifier(getDataSource(), interval, version, new NumberedShardSpec(partitionNum, 0));
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -652,9 +632,20 @@ public class IndexTask extends AbstractTask
|
|||
continue;
|
||||
}
|
||||
|
||||
final Interval interval = optInterval.get();
|
||||
final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, inputRow);
|
||||
final String sequenceName = Appenderators.getSequenceName(interval, version, shardSpec);
|
||||
|
||||
final String sequenceName;
|
||||
|
||||
if (isGuaranteedRollup) {
|
||||
// Sequence name is based solely on the shardSpec, and there will only be one segment per sequence.
|
||||
final Interval interval = optInterval.get();
|
||||
final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, inputRow);
|
||||
sequenceName = Appenderators.getSequenceName(interval, version, shardSpec);
|
||||
} else {
|
||||
// Segments are created as needed, using a single sequence name. They may be allocated from the overlord
|
||||
// (in append mode) or may be created on our own authority (in overwrite mode).
|
||||
sequenceName = getId();
|
||||
}
|
||||
|
||||
final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier);
|
||||
|
||||
if (addResult.isOk()) {
|
||||
|
@ -672,7 +663,6 @@ public class IndexTask extends AbstractTask
|
|||
),
|
||||
publishTimeout
|
||||
);
|
||||
published.getSegments().forEach(segment -> shardSpecs.updateShardSpec(segment.getInterval()));
|
||||
// Even though IndexTask uses NoopHandoffNotifier which does nothing for segment handoff,
|
||||
// the below code is needed to update the total number of rows added to the appenderator so far.
|
||||
// See AppenderatorDriver.registerHandoff() and Appenderator.drop().
|
||||
|
@ -782,35 +772,48 @@ public class IndexTask extends AbstractTask
|
|||
}
|
||||
|
||||
/**
|
||||
* This interface represents a map of (Interval, ShardSpec) and is used for easy shardSpec generation. The most
|
||||
* important method is {@link #updateShardSpec(Interval)} which updates the map according to the type of shardSpec.
|
||||
* This class represents a map of (Interval, ShardSpec) and is used for easy shardSpec generation.
|
||||
*/
|
||||
private interface ShardSpecs
|
||||
static class ShardSpecs
|
||||
{
|
||||
private final Map<Interval, List<ShardSpec>> map;
|
||||
|
||||
ShardSpecs(final Map<Interval, List<ShardSpec>> map)
|
||||
{
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the key set of the underlying map.
|
||||
* Return the underlying map.
|
||||
*
|
||||
* @return a set of intervals
|
||||
* @return a map of intervals to shardSpecs
|
||||
*/
|
||||
Collection<Interval> getIntervals();
|
||||
Map<Interval, List<ShardSpec>> getMap()
|
||||
{
|
||||
return map;
|
||||
}
|
||||
|
||||
Set<Interval> getIntervals()
|
||||
{
|
||||
return map.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a shardSpec for the given interval and input row.
|
||||
*
|
||||
* @param interval interval for shardSpec
|
||||
* @param row input row
|
||||
* @param interval interval for shardSpec
|
||||
* @param row input row
|
||||
*
|
||||
* @return a shardSpec
|
||||
*/
|
||||
ShardSpec getShardSpec(Interval interval, InputRow row);
|
||||
|
||||
/**
|
||||
* Update the shardSpec of the given interval. When the type of shardSpecs is extendable, this method must update
|
||||
* the shardSpec properly. For example, if the {@link NumberedShardSpec} is used, an implementation of this method
|
||||
* may replace the shardSpec of the given interval with a new one having a greater partitionNum.
|
||||
*
|
||||
* @param interval interval for shardSpec to be updated
|
||||
*/
|
||||
void updateShardSpec(Interval interval);
|
||||
ShardSpec getShardSpec(Interval interval, InputRow row)
|
||||
{
|
||||
final List<ShardSpec> shardSpecs = map.get(interval);
|
||||
if (shardSpecs == null || shardSpecs.isEmpty()) {
|
||||
throw new ISE("Failed to get shardSpec for interval[%s]", interval);
|
||||
}
|
||||
return shardSpecs.get(0).getLookup(shardSpecs).getShardSpec(row.getTimestampFromEpoch(), row);
|
||||
}
|
||||
}
|
||||
|
||||
public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
|
||||
|
@ -975,8 +978,8 @@ public class IndexTask extends AbstractTask
|
|||
: targetPartitionSize);
|
||||
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
|
||||
this.maxTotalRows = maxTotalRows == null
|
||||
? DEFAULT_MAX_TOTAL_ROWS
|
||||
: maxTotalRows;
|
||||
? DEFAULT_MAX_TOTAL_ROWS
|
||||
: maxTotalRows;
|
||||
this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
|
||||
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
|
||||
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
|
||||
|
|
|
@ -143,8 +143,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(2, null, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -153,13 +152,13 @@ public class IndexTaskTest
|
|||
|
||||
Assert.assertEquals("test", segments.get(0).getDataSource());
|
||||
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
|
||||
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class));
|
||||
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
|
||||
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
|
||||
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getPartitions());
|
||||
|
||||
Assert.assertEquals("test", segments.get(1).getDataSource());
|
||||
Assert.assertEquals(new Interval("2014/P1D"), segments.get(1).getInterval());
|
||||
Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class));
|
||||
Assert.assertEquals(HashBasedNumberedShardSpec.class, segments.get(1).getShardSpec().getClass());
|
||||
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
|
||||
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getPartitions());
|
||||
}
|
||||
|
@ -187,22 +186,23 @@ public class IndexTaskTest
|
|||
createTuningConfig(2, null, true, false),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
||||
Assert.assertEquals(2, segments.size());
|
||||
|
||||
Assert.assertEquals("test", segments.get(0).getDataSource());
|
||||
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
|
||||
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class));
|
||||
Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
|
||||
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
|
||||
|
||||
Assert.assertEquals("test", segments.get(1).getDataSource());
|
||||
Assert.assertEquals(new Interval("2014/P1D"), segments.get(1).getInterval());
|
||||
Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class));
|
||||
Assert.assertEquals(NumberedShardSpec.class, segments.get(1).getShardSpec().getClass());
|
||||
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
|
||||
}
|
||||
|
||||
|
@ -232,8 +232,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(10, null, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -267,8 +266,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(50, null, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -298,8 +296,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(null, 1, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -335,10 +332,11 @@ public class IndexTaskTest
|
|||
createTuningConfig(2, null, false, false),
|
||||
true
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
Assert.assertEquals("index_append_test", indexTask.getGroupId());
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
||||
Assert.assertEquals(2, segmentAllocatePartitionCounter);
|
||||
|
@ -381,8 +379,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(2, null, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -442,8 +439,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(2, null, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -492,8 +488,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(2, null, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -537,8 +532,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(2, 2, 2, null, false, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -552,7 +546,7 @@ public class IndexTaskTest
|
|||
|
||||
Assert.assertEquals("test", segment.getDataSource());
|
||||
Assert.assertEquals(expectedInterval, segment.getInterval());
|
||||
Assert.assertTrue(segment.getShardSpec().getClass().equals(NumberedShardSpec.class));
|
||||
Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass());
|
||||
Assert.assertEquals(expectedPartitionNum, segment.getShardSpec().getPartitionNum());
|
||||
}
|
||||
}
|
||||
|
@ -580,8 +574,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(3, 2, 2, null, false, true, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -622,8 +615,7 @@ public class IndexTaskTest
|
|||
createTuningConfig(3, 2, 2, null, false, false, true),
|
||||
false
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -636,7 +628,7 @@ public class IndexTaskTest
|
|||
|
||||
Assert.assertEquals("test", segment.getDataSource());
|
||||
Assert.assertEquals(expectedInterval, segment.getInterval());
|
||||
Assert.assertTrue(segment.getShardSpec().getClass().equals(NumberedShardSpec.class));
|
||||
Assert.assertEquals(NumberedShardSpec.class, segment.getShardSpec().getClass());
|
||||
Assert.assertEquals(i, segment.getShardSpec().getPartitionNum());
|
||||
}
|
||||
}
|
||||
|
@ -698,8 +690,7 @@ public class IndexTaskTest
|
|||
null,
|
||||
null,
|
||||
parseExceptionIgnoreSpec,
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -752,8 +743,7 @@ public class IndexTaskTest
|
|||
null,
|
||||
null,
|
||||
parseExceptionIgnoreSpec,
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
runTask(indexTask);
|
||||
|
@ -812,8 +802,7 @@ public class IndexTaskTest
|
|||
null,
|
||||
null,
|
||||
parseExceptionIgnoreSpec,
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
@ -883,8 +872,7 @@ public class IndexTaskTest
|
|||
null,
|
||||
null,
|
||||
parseExceptionIgnoreSpec,
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
runTask(indexTask);
|
||||
|
|
|
@ -187,8 +187,7 @@ public class TaskSerdeTest
|
|||
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null)
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
final String json = jsonMapper.writeValueAsString(task);
|
||||
|
@ -250,8 +249,7 @@ public class TaskSerdeTest
|
|||
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
|
||||
),
|
||||
null,
|
||||
jsonMapper
|
||||
null
|
||||
);
|
||||
|
||||
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
|
||||
|
|
|
@ -657,8 +657,7 @@ public class TaskLifecycleTest
|
|||
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
|
||||
),
|
||||
null,
|
||||
MAPPER
|
||||
null
|
||||
);
|
||||
|
||||
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId());
|
||||
|
@ -715,8 +714,7 @@ public class TaskLifecycleTest
|
|||
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
|
||||
),
|
||||
null,
|
||||
MAPPER
|
||||
null
|
||||
);
|
||||
|
||||
final TaskStatus status = runTask(indexTask);
|
||||
|
@ -1080,8 +1078,7 @@ public class TaskLifecycleTest
|
|||
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false),
|
||||
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null)
|
||||
),
|
||||
null,
|
||||
MAPPER
|
||||
null
|
||||
);
|
||||
|
||||
final long startTime = System.currentTimeMillis();
|
||||
|
|
Loading…
Reference in New Issue