Early publishing segments in the middle of data ingestion (#4238)

* Early publishing segments in the middle of data ingestion

* Remove unnecessary logs

* Address comments

* Refactoring the patch according to #4292 and address comments

* Set the total shard number of NumberedShardSpec to 0

* refactoring

* Address comments

* Fix tests

* Address comments

* Fix sync problem of committer and retry push only

* Fix doc

* Fix build failure

* Address comments

* Fix compilation failure

* Fix transient test failure
This commit is contained in:
Jihoon Son 2017-07-11 14:35:36 +09:00 committed by Gian Merlino
parent 8ffac8f5e7
commit cc20260078
24 changed files with 1009 additions and 438 deletions

View File

@ -32,30 +32,6 @@ They each represent an axis of the data that weve chosen to slice across.
Metrics are usually numeric values, and computations include operations such as count, sum, and mean.
Also known as measures in standard OLAP terminology.
## Roll-up
The individual events in our example data set are not very interesting because there may be trillions of such events.
However, summarizations of this type of data can yield many useful insights.
Druid summarizes this raw data at ingestion time using a process we refer to as "roll-up".
Roll-up is a first-level aggregation operation over a selected set of dimensions, equivalent to (in pseudocode):
GROUP BY timestamp, publisher, advertiser, gender, country
:: impressions = COUNT(1), clicks = SUM(click), revenue = SUM(price)
The compacted version of our original raw data looks something like this:
timestamp publisher advertiser gender country impressions clicks revenue
2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70
2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18
2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31
2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01
In practice, we see that rolling up data can dramatically reduce the size of data that needs to be stored (up to a factor of 100).
Druid will roll up data as it is ingested to minimize the amount of raw data that needs to be stored.
This storage reduction does come at a cost; as we roll up data, we lose the ability to query individual events. Phrased another way,
the rollup granularity is the minimum granularity you will be able to explore data at and events are floored to this granularity.
Hence, Druid ingestion specs define this granularity as the `queryGranularity` of the data. The lowest supported `queryGranularity` is millisecond.
## Sharding the Data
Druid shards are called `segments` and Druid always first shards data by time. In our compacted data set, we can create two segments, one for each hour of data.
@ -80,6 +56,39 @@ scan segments.
Segments are uniquely identified by a datasource, interval, version, and an optional partition number.
Examining our example segments, the segments are named following this convention: `dataSource_interval_version_partitionNumber`
## Roll-up
The individual events in our example data set are not very interesting because there may be trillions of such events.
However, summarizations of this type of data can yield many useful insights.
Druid summarizes this raw data at ingestion time using a process we refer to as "roll-up".
Roll-up is a first-level aggregation operation over a selected set of dimensions, equivalent to (in pseudocode):
GROUP BY timestamp, publisher, advertiser, gender, country
:: impressions = COUNT(1), clicks = SUM(click), revenue = SUM(price)
The compacted version of our original raw data looks something like this:
timestamp publisher advertiser gender country impressions clicks revenue
2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70
2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18
2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31
2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01
In practice, we see that rolling up data can dramatically reduce the size of data that needs to be stored (up to a factor of 100).
Druid will roll up data as it is ingested to minimize the amount of raw data that needs to be stored.
This storage reduction does come at a cost; as we roll up data, we lose the ability to query individual events. Phrased another way,
the rollup granularity is the minimum granularity you will be able to explore data at and events are floored to this granularity.
Hence, Druid ingestion specs define this granularity as the `queryGranularity` of the data. The lowest supported `queryGranularity` is millisecond.
### Roll-up modes
Druid supports two roll-up modes, i.e., _perfect roll-up_ and _best-effort roll-up_. In the perfect roll-up mode, Druid guarantees that input data are perfectly aggregated at ingestion time. Meanwhile, in the best-effort roll-up, input data might not be perfectly aggregated and thus there can be multiple segments holding the rows which should belong to the same segment with the perfect roll-up since they have the same dimension value and their timestamps fall into the same interval.
The perfect roll-up mode encompasses an additional preprocessing step to determine intervals and shardSpecs before actual data ingestion if they are not specified in the ingestionSpec. This preprocessing step usually scans the entire input data which might increase the ingestion time. The [Hadoop indexing task](./ingestion/batch-ingestion.html) always runs with this perfect roll-up mode.
On the contrary, the best-effort roll-up mode doesn't require any preprocessing step, but the size of ingested data might be larger than that of the perfect roll-up. All types of [streaming indexing (i.e., realtime index task, kafka indexing service, ...)](./ingestion/stream-ingestion.html) run with this mode.
Finally, the [native index task](./ingestion/tasks.html) supports both modes and you can choose either one which fits to your application.
## Indexing the Data

View File

@ -114,10 +114,12 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|type|The task type, this should always be "index".|none|yes|
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no|
|maxTotalRows|Total number of rows in segments waiting for being published. Used in determining when intermediate publish should occur.|150000|no|
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](./design/index.html). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. This flag cannot be used with either `appendToExisting` of IOConfig or `forceExtendableShardSpecs`. For more details, see the below __Segment publishing modes__ section.|false|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|publishTimeout|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|0|no|
@ -148,6 +150,16 @@ For Roaring bitmaps:
|type|String|Must be `roaring`.|yes|
|compressRunOnSerialization|Boolean|Use a run-length encoding where it is estimated as more space efficient.|no (default == `true`)|
#### Segment publishing modes
While ingesting data using the Index task, it creates segments from the input data and publishes them. For segment publishing, the Index task supports two segment publishing modes, i.e., _bulk publishing mode_ and _incremental publishing mode_ for [perfect rollup and best-effort rollup](./design/index.html), respectively.
In the bulk publishing mode, every segment is published at the very end of the index task. Until then, created segments are stored in the memory and local storage of the node running the index task. As a result, this mode might cause a problem due to limited storage capacity, and is not recommended to use in production.
On the contrary, in the incremental publishing mode, segments are incrementally published, that is they can be published in the middle of the index task. More precisely, the index task collects data and stores created segments in the memory and disks of the node running that task until the total number of collected rows exceeds `maxTotalRows`. Once it exceeds, the index task immediately publishes all segments created until that moment, cleans all published segments up, and continues to ingest remaining data.
To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig.
Segment Merging Tasks
---------------------

View File

@ -19,12 +19,12 @@
package io.druid.indexing.appenderator;
import io.druid.data.input.InputRow;
import io.druid.indexing.common.actions.SegmentAllocateAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.joda.time.DateTime;
import java.io.IOException;
@ -44,7 +44,7 @@ public class ActionBasedSegmentAllocator implements SegmentAllocator
@Override
public SegmentIdentifier allocate(
final DateTime timestamp,
final InputRow row,
final String sequenceName,
final String previousSegmentId
) throws IOException
@ -52,7 +52,7 @@ public class ActionBasedSegmentAllocator implements SegmentAllocator
return taskActionClient.submit(
new SegmentAllocateAction(
dataSchema.getDataSource(),
timestamp,
row.getTimestamp(),
dataSchema.getGranularitySpec().getQueryGranularity(),
dataSchema.getGranularitySpec().getSegmentGranularity(),
sequenceName,

View File

@ -33,12 +33,10 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
@ -76,9 +74,9 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.appenderator.AppenderatorDriver;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.AppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
@ -90,7 +88,6 @@ import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import org.codehaus.plexus.util.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -99,13 +96,18 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class IndexTask extends AbstractTask
{
@ -185,12 +187,12 @@ public class IndexTask extends AbstractTask
// Firehose temporary directory is automatically removed when this IndexTask completes.
FileUtils.forceMkdir(firehoseTempDir);
final Map<Interval, List<ShardSpec>> shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir);
final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir);
final String version;
final DataSchema dataSchema;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(shardSpecs.keySet());
Interval interval = JodaUtils.umbrellaInterval(shardSpecs.getIntervals());
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
version = lock.getVersion();
dataSchema = ingestionSchema.getDataSchema().withGranularitySpec(
@ -198,7 +200,7 @@ public class IndexTask extends AbstractTask
.getGranularitySpec()
.withIntervals(
JodaUtils.condenseIntervals(
shardSpecs.keySet()
shardSpecs.getIntervals()
)
)
);
@ -214,61 +216,182 @@ public class IndexTask extends AbstractTask
}
}
private static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConfig tuningConfig)
{
Preconditions.checkState(
!(tuningConfig.isForceGuaranteedRollup() &&
(tuningConfig.isForceExtendableShardSpecs() || ioConfig.isAppendToExisting())),
"Perfect rollup cannot be guaranteed with extendable shardSpecs"
);
return tuningConfig.isForceGuaranteedRollup();
}
/**
* Determines the number of shards for each interval using a hash of queryGranularity timestamp + all dimensions (i.e
* hash-based partitioning). In the future we may want to also support single-dimension partitioning.
* Determines intervals and shardSpecs for input data. This method first checks that it must determine intervals and
* shardSpecs by itself. Intervals must be determined if they are not specified in {@link GranularitySpec}.
* ShardSpecs must be determined if the perfect rollup must be guaranteed even though the number of shards is not
* specified in {@link IndexTuningConfig}.
* <P/>
* If both intervals and shardSpecs don't have to be determined, this method simply returns {@link ShardSpecs} for the
* given intervals. Here, if {@link IndexTuningConfig#numShards} is not specified, {@link NumberedShardSpec} is used.
* <p/>
* If one of intervals or shardSpecs need to be determined, this method reads the entire input for determining one of
* them. If the perfect rollup must be guaranteed, {@link HashBasedNumberedShardSpec} is used for hash partitioning
* of input data. In the future we may want to also support single-dimension partitioning.
*
* @return generated {@link ShardSpecs} representing a map of intervals and corresponding shard specs
*/
private Map<Interval, List<ShardSpec>> determineShardSpecs(
private ShardSpecs determineShardSpecs(
final TaskToolbox toolbox,
final FirehoseFactory firehoseFactory,
final File firehoseTempDir
) throws IOException
{
final ObjectMapper jsonMapper = toolbox.getObjectMapper();
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
final Granularity queryGranularity = granularitySpec.getQueryGranularity();
final boolean determineNumPartitions = ingestionSchema.getTuningConfig().getNumShards() == null;
final boolean determineIntervals = !ingestionSchema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.isPresent();
final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
final Map<Interval, List<ShardSpec>> shardSpecs = Maps.newHashMap();
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent();
// Guaranteed rollup means that this index task guarantees the 'perfect rollup' across the entire data set.
final boolean guaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig);
final boolean determineNumPartitions = tuningConfig.getNumShards() == null && guaranteedRollup;
final boolean useExtendableShardSpec = !guaranteedRollup;
// if we were given number of shards per interval and the intervals, we don't need to scan the data
if (!determineNumPartitions && !determineIntervals) {
log.info("numShards and intervals provided, skipping determine partition scan");
final SortedSet<Interval> intervals = ingestionSchema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.get();
final int numShards = ingestionSchema.getTuningConfig().getNumShards();
for (Interval interval : intervals) {
final List<ShardSpec> intervalShardSpecs = Lists.newArrayListWithCapacity(numShards);
if (numShards > 1) {
for (int i = 0; i < numShards; i++) {
intervalShardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper));
}
log.info("Skipping determine partition scan");
return createShardSpecWithoutInputScan(jsonMapper, granularitySpec, tuningConfig, useExtendableShardSpec);
} else {
intervalShardSpecs.add(NoneShardSpec.instance());
}
shardSpecs.put(interval, intervalShardSpecs);
}
return shardSpecs;
}
// determine intervals containing data and prime HLL collectors
final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = Maps.newHashMap();
int thrownAway = 0;
int unparseable = 0;
return createShardSpecsFromInput(
jsonMapper,
ingestionSchema,
firehoseFactory,
firehoseTempDir,
granularitySpec,
tuningConfig,
determineIntervals,
determineNumPartitions,
useExtendableShardSpec
);
}
}
private static ShardSpecs createShardSpecWithoutInputScan(
ObjectMapper jsonMapper,
GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean useExtendableShardSpec
)
{
final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
useExtendableShardSpec,
numShards,
jsonMapper
);
final Map<Interval, List<ShardSpec>> intervalToShardSpecs = new HashMap<>();
for (Interval interval : granularitySpec.bucketIntervals().get()) {
final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
.mapToObj(
shardId -> shardSpecCreateFn.apply(shardId, numShards)
)
.collect(Collectors.toList());
intervalToShardSpecs.put(interval, intervalShardSpecs);
}
if (useExtendableShardSpec) {
return createExtendableShardSpecs(intervalToShardSpecs);
} else {
return createNonExtendableShardSpecs(intervalToShardSpecs);
}
}
private static ShardSpecs createShardSpecsFromInput(
ObjectMapper jsonMapper,
IndexIngestionSpec ingestionSchema,
FirehoseFactory firehoseFactory,
File firehoseTempDir,
GranularitySpec granularitySpec,
IndexTuningConfig tuningConfig,
boolean determineIntervals,
boolean determineNumPartitions,
boolean useExtendableShardSpec
) throws IOException
{
log.info("Determining intervals and shardSpecs");
long determineShardSpecsStartMillis = System.currentTimeMillis();
try (final Firehose firehose = firehoseFactory.connect(
ingestionSchema.getDataSchema().getParser(),
firehoseTempDir)
final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = collectIntervalsAndShardSpecs(
jsonMapper,
ingestionSchema,
firehoseFactory,
firehoseTempDir,
granularitySpec,
determineIntervals,
determineNumPartitions
);
final Map<Interval, List<ShardSpec>> intervalToShardSpecs = new HashMap<>();
final int defaultNumShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
for (final Map.Entry<Interval, Optional<HyperLogLogCollector>> entry : hllCollectors.entrySet()) {
final Interval interval = entry.getKey();
final Optional<HyperLogLogCollector> collector = entry.getValue();
final int numShards;
if (determineNumPartitions) {
final long numRows = new Double(collector.get().estimateCardinality()).longValue();
numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize());
log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
} else {
numShards = defaultNumShards;
log.info("Creating [%,d] shards for interval [%s]", numShards, interval);
}
final BiFunction<Integer, Integer, ShardSpec> shardSpecCreateFn = getShardSpecCreateFunction(
useExtendableShardSpec,
numShards,
jsonMapper
);
final List<ShardSpec> intervalShardSpecs = IntStream.range(0, numShards)
.mapToObj(
shardId -> shardSpecCreateFn.apply(shardId, numShards)
).collect(Collectors.toList());
intervalToShardSpecs.put(interval, intervalShardSpecs);
}
log.info("Found intervals and shardSpecs in %,dms", System.currentTimeMillis() - determineShardSpecsStartMillis);
if (useExtendableShardSpec) {
return createExtendableShardSpecs(intervalToShardSpecs);
} else {
return createNonExtendableShardSpecs(intervalToShardSpecs);
}
}
private static Map<Interval, Optional<HyperLogLogCollector>> collectIntervalsAndShardSpecs(
ObjectMapper jsonMapper,
IndexIngestionSpec ingestionSchema,
FirehoseFactory firehoseFactory,
File firehoseTempDir,
GranularitySpec granularitySpec,
boolean determineIntervals,
boolean determineNumPartitions
) throws IOException
{
final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
int thrownAway = 0;
int unparseable = 0;
final Granularity queryGranularity = granularitySpec.getQueryGranularity();
try (
final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser(), firehoseTempDir)
) {
while (firehose.hasMore()) {
try {
@ -291,15 +414,7 @@ public class IndexTask extends AbstractTask
interval = optInterval.get();
}
if (!determineNumPartitions) {
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
// for the interval and don't instantiate a HLL collector
if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.<HyperLogLogCollector>absent());
}
continue;
}
if (determineNumPartitions) {
if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
}
@ -308,7 +423,15 @@ public class IndexTask extends AbstractTask
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
inputRow
);
hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
hllCollectors.get(interval).get()
.add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
} else {
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
// for the interval and don't instantiate a HLL collector
if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.absent());
}
}
}
catch (ParseException e) {
if (ingestionSchema.getTuningConfig().isReportParseExceptions()) {
@ -327,45 +450,121 @@ public class IndexTask extends AbstractTask
if (unparseable > 0) {
log.warn("Unable to parse [%,d] events", unparseable);
}
return hllCollectors;
}
final ImmutableSortedMap<Interval, Optional<HyperLogLogCollector>> sortedMap = ImmutableSortedMap.copyOf(
hllCollectors,
Comparators.intervalsByStartThenEnd()
private static ShardSpecs createNonExtendableShardSpecs(Map<Interval, List<ShardSpec>> intervalToShardSpecs)
{
return new ShardSpecs()
{
@Override
public Collection<Interval> getIntervals()
{
return intervalToShardSpecs.keySet();
}
@Override
public ShardSpec getShardSpec(Interval interval, InputRow row)
{
final List<ShardSpec> shardSpecs = intervalToShardSpecs.get(interval);
if (shardSpecs == null || shardSpecs.isEmpty()) {
throw new ISE("Failed to get shardSpec for interval[%s]", interval);
}
return shardSpecs.get(0).getLookup(shardSpecs).getShardSpec(row.getTimestampFromEpoch(), row);
}
@Override
public void updateShardSpec(Interval interval)
{
// do nothing
}
};
}
private static ShardSpecs createExtendableShardSpecs(Map<Interval, List<ShardSpec>> intervalToShardSpec)
{
final Map<Interval, ShardSpec> shardSpecMap = new HashMap<>(intervalToShardSpec.size());
intervalToShardSpec.forEach((interval, shardSpecs) -> {
Preconditions.checkState(shardSpecs.size() == 1);
shardSpecMap.put(interval, shardSpecs.get(0));
});
return new ShardSpecs()
{
@Override
public Collection<Interval> getIntervals()
{
return shardSpecMap.keySet();
}
@Override
public ShardSpec getShardSpec(Interval interval, InputRow row)
{
return shardSpecMap.get(interval);
}
@Override
public void updateShardSpec(Interval interval)
{
final ShardSpec shardSpec = shardSpecMap.get(interval);
Preconditions.checkState(
shardSpec instanceof NumberedShardSpec,
"shardSpec[%s] must be NumberedShardSpec",
shardSpec.getClass().getCanonicalName()
);
final NumberedShardSpec previous = (NumberedShardSpec) shardSpec;
Preconditions.checkNotNull(previous, "previous shardSpec for interval[%s] is null", interval);
shardSpecMap.put(interval, new NumberedShardSpec(previous.getPartitionNum() + 1, previous.getPartitions()));
}
};
}
for (final Map.Entry<Interval, Optional<HyperLogLogCollector>> entry : sortedMap.entrySet()) {
final Interval interval = entry.getKey();
final Optional<HyperLogLogCollector> collector = entry.getValue();
final int numShards;
if (determineNumPartitions) {
final long numRows = new Double(collector.get().estimateCardinality()).longValue();
numShards = (int) Math.ceil((double) numRows / ingestionSchema.getTuningConfig().getTargetPartitionSize());
log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
private static BiFunction<Integer, Integer, ShardSpec> getShardSpecCreateFunction(
boolean useExtendableShardSpec,
Integer numShards,
ObjectMapper jsonMapper
)
{
if (useExtendableShardSpec) {
// 0 partitions means there's no core partitions. See NumberedPartitionChunk.isStart() and
// NumberedPartitionChunk.isEnd().
return (shardId, notUsed) -> new NumberedShardSpec(shardId, 0);
} else {
numShards = ingestionSchema.getTuningConfig().getNumShards();
log.info("Creating [%,d] shards for interval [%s]", numShards, interval);
if (numShards == null) {
throw new ISE("numShards must not be null");
}
final List<ShardSpec> intervalShardSpecs = Lists.newArrayListWithCapacity(numShards);
if (numShards > 1) {
for (int i = 0; i < numShards; i++) {
intervalShardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper));
}
if (numShards == 1) {
return (shardId, totalNumShards) -> NoneShardSpec.instance();
} else {
intervalShardSpecs.add(NoneShardSpec.instance());
return (shardId, totalNumShards) -> new HashBasedNumberedShardSpec(shardId, totalNumShards, null, jsonMapper);
}
shardSpecs.put(interval, intervalShardSpecs);
}
log.info("Found intervals and shardSpecs in %,dms", System.currentTimeMillis() - determineShardSpecsStartMillis);
return shardSpecs;
}
/**
* This method reads input data row by row and adds the read row to a proper segment using {@link AppenderatorDriver}.
* If there is no segment for the row, a new one is created. Segments can be published in the middle of reading inputs
* if one of below conditions are satisfied.
*
* <ul>
* <li>
* If the number of rows in a segment exceeds {@link IndexTuningConfig#targetPartitionSize}
* </li>
* <li>
* If the number of rows added to {@link AppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows}
* </li>
* </ul>
*
* At the end of this method, all the remaining segments are published.
*
* @return true if generated segments are successfully published, otherwise false
*/
private boolean generateAndPublishSegments(
final TaskToolbox toolbox,
final DataSchema dataSchema,
final Map<Interval, List<ShardSpec>> shardSpecs,
final ShardSpecs shardSpecs,
final String version,
final FirehoseFactory firehoseFactory,
final File firehoseTempDir
@ -377,7 +576,6 @@ public class IndexTask extends AbstractTask
dataSchema, new RealtimeIOConfig(null, null, null), null
);
final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
final Map<String, ShardSpec> sequenceNameToShardSpecMap = Maps.newHashMap();
if (toolbox.getMonitorScheduler() != null) {
toolbox.getMonitorScheduler().addMonitor(
@ -388,33 +586,43 @@ public class IndexTask extends AbstractTask
);
}
final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig;
final long publishTimeout = tuningConfig.getPublishTimeout();
final long maxRowsInAppenderator = tuningConfig.getMaxTotalRows();
final int maxRowsInSegment = tuningConfig.getTargetPartitionSize() == null
? Integer.MAX_VALUE
: tuningConfig.getTargetPartitionSize();
final boolean isGuaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig);
final SegmentAllocator segmentAllocator;
if (ingestionSchema.getIOConfig().isAppendToExisting()) {
if (ioConfig.isAppendToExisting()) {
segmentAllocator = new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema);
} else {
segmentAllocator = new SegmentAllocator()
{
@Override
public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, String previousSegmentId)
throws IOException
{
Optional<Interval> interval = granularitySpec.bucketInterval(timestamp);
if (!interval.isPresent()) {
segmentAllocator = (row, sequenceName, previousSegmentId) -> {
final DateTime timestamp = row.getTimestamp();
Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp);
if (!maybeInterval.isPresent()) {
throw new ISE("Could not find interval for timestamp [%s]", timestamp);
}
ShardSpec shardSpec = sequenceNameToShardSpecMap.get(sequenceName);
final Interval interval = maybeInterval.get();
final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, row);
if (shardSpec == null) {
throw new ISE("Could not find ShardSpec for sequenceName [%s]", sequenceName);
throw new ISE("Could not find shardSpec for interval[%s]", interval);
}
return new SegmentIdentifier(getDataSource(), interval.get(), version, shardSpec);
}
return new SegmentIdentifier(getDataSource(), interval, version, shardSpec);
};
}
final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments, null, null);
return toolbox.getTaskActionClient().submit(action).isSuccess();
};
try (
final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema);
final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig);
final AppenderatorDriver driver = newDriver(
appenderator,
toolbox,
@ -424,7 +632,6 @@ public class IndexTask extends AbstractTask
final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir)
) {
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
final Map<Interval, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
if (driver.startJob() != null) {
driver.clear();
@ -446,41 +653,40 @@ public class IndexTask extends AbstractTask
}
final Interval interval = optInterval.get();
if (!shardSpecLookups.containsKey(interval)) {
final List<ShardSpec> intervalShardSpecs = shardSpecs.get(interval);
if (intervalShardSpecs == null || intervalShardSpecs.isEmpty()) {
throw new ISE("Failed to get shardSpec for interval[%s]", interval);
}
shardSpecLookups.put(interval, intervalShardSpecs.get(0).getLookup(intervalShardSpecs));
}
final ShardSpec shardSpec = shardSpecLookups.get(interval)
.getShardSpec(inputRow.getTimestampFromEpoch(), inputRow);
final String sequenceName = StringUtils.format("index_%s_%s_%d", interval, version, shardSpec.getPartitionNum());
if (!sequenceNameToShardSpecMap.containsKey(sequenceName)) {
final ShardSpec shardSpecForPublishing = ingestionSchema.getTuningConfig().isForceExtendableShardSpecs()
|| ingestionSchema.getIOConfig().isAppendToExisting()
? new NumberedShardSpec(
shardSpec.getPartitionNum(),
shardSpecs.get(interval).size()
)
: shardSpec;
sequenceNameToShardSpecMap.put(sequenceName, shardSpecForPublishing);
}
final ShardSpec shardSpec = shardSpecs.getShardSpec(interval, inputRow);
final String sequenceName = Appenderators.getSequenceName(interval, version, shardSpec);
final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier);
if (!addResult.isOk()) {
throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp());
if (addResult.isOk()) {
// incremental segment publishment is allowed only when rollup don't have to be perfect.
if (!isGuaranteedRollup &&
(addResult.getNumRowsInSegment() >= maxRowsInSegment ||
addResult.getTotalNumRowsInAppenderator() >= maxRowsInAppenderator)) {
// There can be some segments waiting for being published even though any rows won't be added to them.
// If those segments are not published here, the available space in appenderator will be kept to be small
// which makes the size of segments smaller.
final SegmentsAndMetadata published = awaitPublish(
driver.publishAll(
publisher,
committerSupplier.get()
),
publishTimeout
);
published.getSegments().forEach(segment -> shardSpecs.updateShardSpec(segment.getInterval()));
// Even though IndexTask uses NoopHandoffNotifier which does nothing for segment handoff,
// the below code is needed to update the total number of rows added to the appenderator so far.
// See AppenderatorDriver.registerHandoff() and Appenderator.drop().
// A hard-coded timeout is used here because the below get() is expected to return immediately.
driver.registerHandoff(published).get(30, TimeUnit.SECONDS);
}
} else {
throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
}
fireDepartmentMetrics.incrementProcessed();
}
catch (ParseException e) {
if (ingestionSchema.getTuningConfig().isReportParseExceptions()) {
if (tuningConfig.isReportParseExceptions()) {
throw e;
} else {
fireDepartmentMetrics.incrementUnparseable();
@ -492,31 +698,13 @@ public class IndexTask extends AbstractTask
driver.persist(committerSupplier.get());
}
final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher()
{
@Override
public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException
{
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments, null, null);
return toolbox.getTaskActionClient().submit(action).isSuccess();
}
};
final SegmentsAndMetadata published;
final long publishTimeout = ingestionSchema.getTuningConfig().getPublishTimeout();
if (publishTimeout == 0) {
published = driver.publish(
final SegmentsAndMetadata published = awaitPublish(
driver.publishAll(
publisher,
committerSupplier.get(),
sequenceNameToShardSpecMap.keySet()
).get();
} else {
published = driver.publish(
publisher,
committerSupplier.get(),
sequenceNameToShardSpecMap.keySet()
).get(publishTimeout, TimeUnit.MILLISECONDS);
}
committerSupplier.get()
),
publishTimeout
);
if (published == null) {
log.error("Failed to publish segments, aborting!");
@ -545,11 +733,29 @@ public class IndexTask extends AbstractTask
}
}
private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox, DataSchema dataSchema)
private static SegmentsAndMetadata awaitPublish(
ListenableFuture<SegmentsAndMetadata> publishFuture,
long publishTimeout
)
throws ExecutionException, InterruptedException, TimeoutException
{
if (publishTimeout == 0) {
return publishFuture.get();
} else {
return publishFuture.get(publishTimeout, TimeUnit.MILLISECONDS);
}
}
private static Appenderator newAppenderator(
FireDepartmentMetrics metrics,
TaskToolbox toolbox,
DataSchema dataSchema,
IndexTuningConfig tuningConfig
)
{
return Appenderators.createOffline(
dataSchema,
ingestionSchema.getTuningConfig().withBasePersistDirectory(toolbox.getPersistDir()),
tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
toolbox.getSegmentPusher(),
toolbox.getObjectMapper(),
@ -558,7 +764,7 @@ public class IndexTask extends AbstractTask
);
}
private AppenderatorDriver newDriver(
private static AppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox,
final SegmentAllocator segmentAllocator,
@ -575,6 +781,38 @@ public class IndexTask extends AbstractTask
);
}
/**
* This interface represents a map of (Interval, ShardSpec) and is used for easy shardSpec generation. The most
* important method is {@link #updateShardSpec(Interval)} which updates the map according to the type of shardSpec.
*/
private interface ShardSpecs
{
/**
* Return the key set of the underlying map.
*
* @return a set of intervals
*/
Collection<Interval> getIntervals();
/**
* Return a shardSpec for the given interval and input row.
*
* @param interval interval for shardSpec
* @param row input row
* @return a shardSpec
*/
ShardSpec getShardSpec(Interval interval, InputRow row);
/**
* Update the shardSpec of the given interval. When the type of shardSpecs is extendable, this method must update
* the shardSpec properly. For example, if the {@link NumberedShardSpec} is used, an implementation of this method
* may replace the shardSpec of the given interval with a new one having a greater partitionNum.
*
* @param interval interval for shardSpec to be updated
*/
void updateShardSpec(Interval interval);
}
public static class IndexIngestionSpec extends IngestionSpec<IndexIOConfig, IndexTuningConfig>
{
private final DataSchema dataSchema;
@ -592,10 +830,7 @@ public class IndexTask extends AbstractTask
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig == null
?
new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null)
: tuningConfig;
this.tuningConfig = tuningConfig == null ? new IndexTuningConfig() : tuningConfig;
}
@Override
@ -654,11 +889,13 @@ public class IndexTask extends AbstractTask
@JsonTypeName("index")
public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75000;
private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000;
private static final int DEFAULT_MAX_TOTAL_ROWS = 150_000;
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final boolean DEFAULT_BUILD_V9_DIRECTLY = true;
private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false;
private static final boolean DEFAULT_GUARANTEE_ROLLUP = false;
private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
private static final long DEFAULT_PUBLISH_TIMEOUT = 0;
@ -666,11 +903,13 @@ public class IndexTask extends AbstractTask
private final Integer targetPartitionSize;
private final int maxRowsInMemory;
private final int maxTotalRows;
private final Integer numShards;
private final IndexSpec indexSpec;
private final File basePersistDirectory;
private final int maxPendingPersists;
private final boolean forceExtendableShardSpecs;
private final boolean forceGuaranteedRollup;
private final boolean reportParseExceptions;
private final long publishTimeout;
@ -678,6 +917,7 @@ public class IndexTask extends AbstractTask
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxTotalRows") @Nullable Integer maxTotalRows,
@JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED
@JsonProperty("numShards") @Nullable Integer numShards,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@ -685,6 +925,7 @@ public class IndexTask extends AbstractTask
// This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs,
@JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup,
@JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("publishTimeout") @Nullable Long publishTimeout
)
@ -692,23 +933,32 @@ public class IndexTask extends AbstractTask
this(
targetPartitionSize,
maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
maxTotalRows,
numShards,
indexSpec,
maxPendingPersists,
forceExtendableShardSpecs,
forceGuaranteedRollup,
reportParseExceptions,
publishTimeout,
null
);
}
private IndexTuningConfig()
{
this(null, null, null, null, null, null, null, null, null, null, null);
}
private IndexTuningConfig(
@Nullable Integer targetPartitionSize,
@Nullable Integer maxRowsInMemory,
@Nullable Integer maxTotalRows,
@Nullable Integer numShards,
@Nullable IndexSpec indexSpec,
@Nullable Integer maxPendingPersists,
@Nullable Boolean forceExtendableShardSpecs,
@Nullable Boolean forceGuaranteedRollup,
@Nullable Boolean reportParseExceptions,
@Nullable Long publishTimeout,
@Nullable File basePersistDirectory
@ -725,17 +975,26 @@ public class IndexTask extends AbstractTask
? DEFAULT_TARGET_PARTITION_SIZE
: targetPartitionSize);
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
this.maxTotalRows = maxTotalRows == null
? DEFAULT_MAX_TOTAL_ROWS
: maxTotalRows;
this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
this.forceExtendableShardSpecs = forceExtendableShardSpecs == null
? DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS
: forceExtendableShardSpecs;
this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup;
this.reportParseExceptions = reportParseExceptions == null
? DEFAULT_REPORT_PARSE_EXCEPTIONS
: reportParseExceptions;
this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout;
this.basePersistDirectory = basePersistDirectory;
Preconditions.checkArgument(
!(this.forceExtendableShardSpecs && this.forceGuaranteedRollup),
"Perfect rollup cannot be guaranteed with extendable shardSpecs"
);
}
public IndexTuningConfig withBasePersistDirectory(File dir)
@ -743,10 +1002,12 @@ public class IndexTask extends AbstractTask
return new IndexTuningConfig(
targetPartitionSize,
maxRowsInMemory,
maxTotalRows,
numShards,
indexSpec,
maxPendingPersists,
forceExtendableShardSpecs,
forceGuaranteedRollup,
reportParseExceptions,
publishTimeout,
dir
@ -766,6 +1027,12 @@ public class IndexTask extends AbstractTask
return maxRowsInMemory;
}
@JsonProperty
public int getMaxTotalRows()
{
return maxTotalRows;
}
@JsonProperty
public Integer getNumShards()
{
@ -802,6 +1069,18 @@ public class IndexTask extends AbstractTask
return true;
}
@JsonProperty
public boolean isForceExtendableShardSpecs()
{
return forceExtendableShardSpecs;
}
@JsonProperty
public boolean isForceGuaranteedRollup()
{
return forceGuaranteedRollup;
}
@JsonProperty
@Override
public boolean isReportParseExceptions()
@ -809,12 +1088,6 @@ public class IndexTask extends AbstractTask
return reportParseExceptions;
}
@JsonProperty
public boolean isForceExtendableShardSpecs()
{
return forceExtendableShardSpecs;
}
@JsonProperty
public long getPublishTimeout()
{

View File

@ -39,6 +39,7 @@ import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
@ -105,7 +106,7 @@ public class IndexTaskTest
0
);
private final IndexSpec indexSpec;
private static final IndexSpec indexSpec = new IndexSpec();
private final ObjectMapper jsonMapper;
private IndexMergerV9 indexMergerV9;
private IndexIO indexIO;
@ -113,7 +114,6 @@ public class IndexTaskTest
public IndexTaskTest()
{
indexSpec = new IndexSpec();
TestUtils testUtils = new TestUtils();
jsonMapper = testUtils.getTestObjectMapper();
indexMergerV9 = testUtils.getTestIndexMergerV9();
@ -136,7 +136,13 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, null, 2, null, false, false),
createIngestionSpec(
tmpDir,
null,
null,
createTuningConfig(2, null, false, true),
false
),
null,
jsonMapper
);
@ -174,7 +180,13 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, null, 2, null, true, false),
createIngestionSpec(
tmpDir,
null,
null,
createTuningConfig(2, null, true, false),
false
),
null,
jsonMapper
);
@ -187,13 +199,11 @@ public class IndexTaskTest
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class));
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(0).getShardSpec()).getPartitions());
Assert.assertEquals("test", segments.get(1).getDataSource());
Assert.assertEquals(new Interval("2014/P1D"), segments.get(1).getInterval());
Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class));
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
Assert.assertEquals(2, ((NumberedShardSpec) segments.get(1).getShardSpec()).getPartitions());
}
@Test
@ -219,9 +229,7 @@ public class IndexTaskTest
Granularities.MINUTE,
Collections.singletonList(new Interval("2014/2015"))
),
10,
null,
false,
createTuningConfig(10, null, false, true),
false
),
null,
@ -256,9 +264,7 @@ public class IndexTaskTest
Granularities.HOUR,
Collections.singletonList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
),
50,
null,
false,
createTuningConfig(50, null, false, true),
false
),
null,
@ -285,7 +291,13 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, null, null, 1, false, false),
createIngestionSpec(
tmpDir,
null,
null,
createTuningConfig(null, 1, false, true),
false
),
null,
jsonMapper
);
@ -316,7 +328,13 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, null, 2, null, false, true),
createIngestionSpec(
tmpDir,
null,
null,
createTuningConfig(2, null, false, false),
true
),
null,
jsonMapper
);
@ -360,9 +378,7 @@ public class IndexTaskTest
Granularities.MINUTE,
null
),
2,
null,
false,
createTuningConfig(2, null, false, true),
false
),
null,
@ -423,9 +439,7 @@ public class IndexTaskTest
0
),
null,
2,
null,
false,
createTuningConfig(2, null, false, true),
false
),
null,
@ -475,9 +489,7 @@ public class IndexTaskTest
0
),
null,
2,
null,
false,
createTuningConfig(2, null, false, true),
false
),
null,
@ -493,6 +505,157 @@ public class IndexTaskTest
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
}
@Test
public void testWithSmallMaxTotalRows() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T00:00:10Z,b,2\n");
writer.write("2014-01-01T00:00:10Z,c,3\n");
writer.write("2014-01-01T01:00:20Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,b,2\n");
writer.write("2014-01-01T01:00:20Z,c,3\n");
writer.write("2014-01-01T02:00:30Z,a,1\n");
writer.write("2014-01-01T02:00:30Z,b,2\n");
writer.write("2014-01-01T02:00:30Z,c,3\n");
}
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
null,
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.MINUTE,
null
),
createTuningConfig(2, 2, 2, null, false, false, true),
false
),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(6, segments.size());
for (int i = 0; i < 6; i++) {
final DataSegment segment = segments.get(i);
final Interval expectedInterval = new Interval(StringUtils.format("2014-01-01T0%d/PT1H", (i / 2)));
final int expectedPartitionNum = i % 2;
Assert.assertEquals("test", segment.getDataSource());
Assert.assertEquals(expectedInterval, segment.getInterval());
Assert.assertTrue(segment.getShardSpec().getClass().equals(NumberedShardSpec.class));
Assert.assertEquals(expectedPartitionNum, segment.getShardSpec().getPartitionNum());
}
}
@Test
public void testPerfectRollup() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
populateRollupTestData(tmpFile);
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
null,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.DAY,
true,
null
),
createTuningConfig(3, 2, 2, null, false, true, true),
false
),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(3, segments.size());
for (int i = 0; i < 3; i++) {
final DataSegment segment = segments.get(i);
final Interval expectedInterval = new Interval("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z");
Assert.assertEquals("test", segment.getDataSource());
Assert.assertEquals(expectedInterval, segment.getInterval());
Assert.assertTrue(segment.getShardSpec().getClass().equals(HashBasedNumberedShardSpec.class));
Assert.assertEquals(i, segment.getShardSpec().getPartitionNum());
}
}
@Test
public void testBestEffortRollup() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
populateRollupTestData(tmpFile);
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
null,
new UniformGranularitySpec(
Granularities.DAY,
Granularities.DAY,
true,
null
),
createTuningConfig(3, 2, 2, null, false, false, true),
false
),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(5, segments.size());
for (int i = 0; i < 5; i++) {
final DataSegment segment = segments.get(i);
final Interval expectedInterval = new Interval("2014-01-01T00:00:00.000Z/2014-01-02T00:00:00.000Z");
Assert.assertEquals("test", segment.getDataSource());
Assert.assertEquals(expectedInterval, segment.getInterval());
Assert.assertTrue(segment.getShardSpec().getClass().equals(NumberedShardSpec.class));
Assert.assertEquals(i, segment.getShardSpec().getPartitionNum());
}
}
private static void populateRollupTestData(File tmpFile) throws IOException
{
try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("2014-01-01T00:00:10Z,a,1\n");
writer.write("2014-01-01T01:00:20Z,a,1\n");
writer.write("2014-01-01T00:00:10Z,b,2\n");
writer.write("2014-01-01T00:00:10Z,c,3\n");
writer.write("2014-01-01T01:00:20Z,b,2\n");
writer.write("2014-01-01T02:00:30Z,a,1\n");
writer.write("2014-01-01T02:00:30Z,b,2\n");
writer.write("2014-01-01T01:00:20Z,c,3\n");
writer.write("2014-01-01T02:00:30Z,c,3\n");
}
}
@Test
public void testIgnoreParseException() throws Exception
{
@ -527,11 +690,8 @@ public class IndexTaskTest
0
),
null,
2,
null,
false,
false,
false // ignore parse exception
createTuningConfig(2, null, null, null, false, false, false), // ignore parse exception,
false
);
IndexTask indexTask = new IndexTask(
@ -584,11 +744,8 @@ public class IndexTaskTest
0
),
null,
2,
null,
false,
false,
true // report parse exception
createTuningConfig(2, null, null, null, false, false, true), // report parse exception
false
);
IndexTask indexTask = new IndexTask(
@ -647,11 +804,8 @@ public class IndexTaskTest
0
),
null,
2,
null,
false,
false,
true // report parse exception
createTuningConfig(2, 1, null, null, false, true, true), // report parse exception
false
);
IndexTask indexTask = new IndexTask(
@ -668,6 +822,10 @@ public class IndexTaskTest
Assert.assertEquals(2, segments.size());
Assert.assertNotEquals(segments.get(0), segments.get(1));
for (DataSegment segment : segments) {
System.out.println(segment.getDimensions());
}
for (int i = 0; i < 2; i++) {
final DataSegment segment = segments.get(i);
final Set<String> dimensions = new HashSet<>(segment.getDimensions());
@ -717,11 +875,8 @@ public class IndexTaskTest
0
),
null,
2,
null,
false,
false,
true // report parse exception
createTuningConfig(2, null, null, null, false, false, true), // report parse exception
false
);
IndexTask indexTask = new IndexTask(
@ -820,34 +975,9 @@ public class IndexTaskTest
File baseDir,
ParseSpec parseSpec,
GranularitySpec granularitySpec,
Integer targetPartitionSize,
Integer numShards,
boolean forceExtendableShardSpecs,
IndexTuningConfig tuningConfig,
boolean appendToExisting
)
{
return createIngestionSpec(
baseDir,
parseSpec,
granularitySpec,
targetPartitionSize,
numShards,
forceExtendableShardSpecs,
appendToExisting,
true
);
}
private IndexTask.IndexIngestionSpec createIngestionSpec(
File baseDir,
ParseSpec parseSpec,
GranularitySpec granularitySpec,
Integer targetPartitionSize,
Integer numShards,
boolean forceExtendableShardSpecs,
boolean appendToExisting,
boolean reportParseException
)
{
return new IndexTask.IndexIngestionSpec(
new DataSchema(
@ -874,20 +1004,54 @@ public class IndexTaskTest
baseDir,
"druid*",
null
), appendToExisting
),
new IndexTask.IndexTuningConfig(
appendToExisting
),
tuningConfig
);
}
private static IndexTuningConfig createTuningConfig(
Integer targetPartitionSize,
Integer numShards,
boolean forceExtendableShardSpecs,
boolean forceGuaranteedRollup
)
{
return createTuningConfig(
targetPartitionSize,
1,
null,
numShards,
forceExtendableShardSpecs,
forceGuaranteedRollup,
true
);
}
private static IndexTuningConfig createTuningConfig(
Integer targetPartitionSize,
Integer maxRowsInMemory,
Integer maxTotalRows,
Integer numShards,
boolean forceExtendableShardSpecs,
boolean forceGuaranteedRollup,
boolean reportParseException
)
{
return new IndexTask.IndexTuningConfig(
targetPartitionSize,
maxRowsInMemory,
maxTotalRows,
null,
numShards,
indexSpec,
null,
true,
forceExtendableShardSpecs,
forceGuaranteedRollup,
reportParseException,
null
)
);
}
}

View File

@ -185,7 +185,7 @@ public class TaskSerdeTest
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true, null)
new IndexTask.IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null)
),
null,
jsonMapper
@ -248,7 +248,7 @@ public class TaskSerdeTest
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null)
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
),
null,
jsonMapper

View File

@ -655,7 +655,7 @@ public class TaskLifecycleTest
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false),
new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null)
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
),
null,
MAPPER
@ -713,7 +713,7 @@ public class TaskLifecycleTest
mapper
),
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false),
new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null)
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
),
null,
MAPPER
@ -1078,7 +1078,7 @@ public class TaskLifecycleTest
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false),
new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null, null)
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null)
),
null,
MAPPER

View File

@ -134,7 +134,8 @@ public class TestQueryHelper
ImmutableList.<AggregatorFactory>of(
new LongSumAggregatorFactory("rows", "count")
)
).granularity(Granularities.ALL)
)
.granularity(Granularities.ALL)
.intervals(interval)
.build();

View File

@ -62,7 +62,8 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* Committer is guaranteed to be *created* synchronously with the call to add, but will actually be used
* asynchronously.
* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread.
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
*
* @param identifier the segment into which this row should be added
* @param row the row to add
@ -92,12 +93,20 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
*/
int getRowCount(final SegmentIdentifier identifier);
/**
* Returns the number of total rows in this appenderator.
*
* @return total number of rows
*/
int getTotalRowCount();
/**
* Drop all in-memory and on-disk data, and forget any previously-remembered commit metadata. This could be useful if,
* for some reason, rows have been added that we do not actually want to hand off. Blocks until all data has been
* cleared. This may take some time, since all pending persists must finish first.
*
* The add, clear, persist, persistAll, and push methods should all be called from the same thread.
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
*/
void clear() throws InterruptedException;
@ -121,7 +130,8 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* persist, but will actually be used asynchronously. Any metadata returned by the committer will be associated with
* the data persisted to disk.
* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread.
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
*
* @param identifiers segment identifiers to be persisted
* @param committer a committer associated with all data that has been added to segments of the given identifiers so
@ -138,7 +148,8 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* be used asynchronously. Any metadata returned by the committer will be associated with the data persisted to
* disk.
* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread.
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
*
* @param committer a committer associated with all data that has been added so far
*
@ -155,7 +166,8 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* <p>
* After this method is called, you cannot add new data to any segments that were previously under construction.
* <p>
* The add, clear, persist, persistAll, and push methods should all be called from the same thread.
* The add, clear, persist, persistAll, and push methods should all be called from the same thread to keep the
* metadata committed by Committer in sync.
*
* @param identifiers list of segments to push
* @param committer a committer associated with all data that has been added so far

View File

@ -57,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -142,9 +141,9 @@ public class AppenderatorDriver implements Closeable
{
handoffNotifier.start();
final FiniteAppenderatorDriverMetadata metadata = objectMapper.convertValue(
final AppenderatorDriverMetadata metadata = objectMapper.convertValue(
appenderator.startJob(),
FiniteAppenderatorDriverMetadata.class
AppenderatorDriverMetadata.class
);
log.info("Restored metadata[%s].", metadata);
@ -215,12 +214,12 @@ public class AppenderatorDriver implements Closeable
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(committerSupplier, "committerSupplier");
final SegmentIdentifier identifier = getSegment(row.getTimestamp(), sequenceName);
final SegmentIdentifier identifier = getSegment(row, sequenceName);
if (identifier != null) {
try {
final int numRows = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier));
return AppenderatorDriverAddResult.ok(identifier, numRows);
final int numRowsInMemory = appenderator.add(identifier, row, wrapCommitterSupplier(committerSupplier));
return AppenderatorDriverAddResult.ok(identifier, numRowsInMemory, appenderator.getTotalRowCount());
}
catch (SegmentNotWritableException e) {
throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", identifier);
@ -265,7 +264,7 @@ public class AppenderatorDriver implements Closeable
*
* @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task
* which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata
* of the caller of {@link FiniteAppenderatorDriverMetadata}
* of the caller of {@link AppenderatorDriverMetadata}
*/
public ListenableFuture<SegmentsAndMetadata> registerHandoff(SegmentsAndMetadata segmentsAndMetadata)
{
@ -281,7 +280,7 @@ public class AppenderatorDriver implements Closeable
return Futures.immediateFuture(
new SegmentsAndMetadata(
segmentsAndMetadata.getSegments(),
((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata())
((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata())
.getCallerMetadata()
)
);
@ -317,7 +316,7 @@ public class AppenderatorDriver implements Closeable
resultFuture.set(
new SegmentsAndMetadata(
segmentsAndMetadata.getSegments(),
((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata())
((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata())
.getCallerMetadata()
)
);
@ -372,23 +371,24 @@ public class AppenderatorDriver implements Closeable
/**
* Return a segment usable for "timestamp". May return null if no segment can be allocated.
*
* @param timestamp data timestamp
* @param row input row
* @param sequenceName sequenceName for potential segment allocation
*
* @return identifier, or null
*
* @throws IOException if an exception occurs while allocating a segment
*/
private SegmentIdentifier getSegment(final DateTime timestamp, final String sequenceName) throws IOException
private SegmentIdentifier getSegment(final InputRow row, final String sequenceName) throws IOException
{
synchronized (activeSegments) {
final DateTime timestamp = row.getTimestamp();
final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName);
if (existing != null) {
return existing;
} else {
// Allocate new segment.
final SegmentIdentifier newSegment = segmentAllocator.allocate(
timestamp,
row,
sequenceName,
lastSegmentIds.get(sequenceName)
);
@ -437,6 +437,27 @@ public class AppenderatorDriver implements Closeable
}
}
/**
* Publish all pending segments.
*
* @param publisher segment publisher
* @param committer committer
*
* @return a {@link ListenableFuture} for the publish task which removes published {@code sequenceNames} from
* {@code activeSegments} and {@code publishPendingSegments}
*/
public ListenableFuture<SegmentsAndMetadata> publishAll(
final TransactionalSegmentPublisher publisher,
final Committer committer
)
{
final List<String> sequenceNames;
synchronized (activeSegments) {
sequenceNames = ImmutableList.copyOf(publishPendingSegments.keySet());
}
return publish(publisher, committer, sequenceNames);
}
/**
* Execute a task in background to publish all segments corresponding to the given sequence names. The task
* internally pushes the segments to the deep storage first, and then publishes the metadata to the metadata storage.
@ -446,7 +467,7 @@ public class AppenderatorDriver implements Closeable
* @param sequenceNames a collection of sequence names to be published
*
* @return a {@link ListenableFuture} for the submitted task which removes published {@code sequenceNames} from
* {@code activeSegments} and {@code publishPendingSegments}.
* {@code activeSegments} and {@code publishPendingSegments}
*/
public ListenableFuture<SegmentsAndMetadata> publish(
final TransactionalSegmentPublisher publisher,
@ -520,15 +541,11 @@ public class AppenderatorDriver implements Closeable
final List<SegmentIdentifier> segmentIdentifiers
)
{
return publishExecutor.submit(
() -> {
long nTry = 0;
while (true) {
try {
log.info("Pushing segments: [%s]", Joiner.on(", ").join(segmentIdentifiers));
final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(segmentIdentifiers, wrappedCommitter)
.get();
return Futures.transform(
appenderator.push(segmentIdentifiers, wrappedCommitter),
(Function<SegmentsAndMetadata, SegmentsAndMetadata>) segmentsAndMetadata -> {
// Sanity check
final Set<SegmentIdentifier> pushedSegments = segmentsAndMetadata.getSegments().stream()
.map(SegmentIdentifier::fromDataSegment)
@ -541,22 +558,23 @@ public class AppenderatorDriver implements Closeable
);
}
if (segmentsAndMetadata.getSegments().isEmpty()) {
log.info("Nothing to publish, skipping publish step.");
} else {
log.info(
"Publishing segments with commitMetadata[%s]: [%s]",
segmentsAndMetadata.getCommitMetadata(),
Joiner.on(", ").join(segmentsAndMetadata.getSegments())
);
if (segmentsAndMetadata.getSegments().isEmpty()) {
log.info("Nothing to publish, skipping publish step.");
} else {
try {
final boolean published = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()
((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()
);
if (published) {
log.info("Published segments, awaiting handoff.");
log.info("Published segments.");
} else {
log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
if (usedSegmentChecker.findUsedSegments(pushedSegments)
@ -568,19 +586,14 @@ public class AppenderatorDriver implements Closeable
}
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
return segmentsAndMetadata;
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
final long sleepMillis = computeNextRetrySleep(++nTry);
log.warn(e, "Failed publish (try %d), retrying in %,dms.", nTry, sleepMillis);
Thread.sleep(sleepMillis);
}
}
}
},
publishExecutor
);
}
@ -607,9 +620,9 @@ public class AppenderatorDriver implements Closeable
private WrappedCommitter wrapCommitter(final Committer committer)
{
final FiniteAppenderatorDriverMetadata wrappedMetadata;
final AppenderatorDriverMetadata wrappedMetadata;
synchronized (activeSegments) {
wrappedMetadata = new FiniteAppenderatorDriverMetadata(
wrappedMetadata = new AppenderatorDriverMetadata(
ImmutableMap.copyOf(
Maps.transformValues(
activeSegments,
@ -644,12 +657,4 @@ public class AppenderatorDriver implements Closeable
}
};
}
private static long computeNextRetrySleep(final long nTry)
{
final long baseSleepMillis = 1000;
final long maxSleepMillis = 60000;
final double fuzzyMultiplier = Math.min(Math.max(1 + 0.2 * new Random().nextGaussian(), 0), 2);
return (long) (Math.min(maxSleepMillis, baseSleepMillis * Math.pow(2, nTry)) * fuzzyMultiplier);
}
}

View File

@ -32,21 +32,31 @@ public class AppenderatorDriverAddResult
{
private final SegmentIdentifier segmentIdentifier;
private final int numRowsInSegment;
private final long totalNumRowsInAppenderator;
public static AppenderatorDriverAddResult ok(SegmentIdentifier segmentIdentifier, int numRowsInSegment)
public static AppenderatorDriverAddResult ok(
SegmentIdentifier segmentIdentifier,
int numRowsInSegment,
long totalNumRowsInAppenderator
)
{
return new AppenderatorDriverAddResult(segmentIdentifier, numRowsInSegment);
return new AppenderatorDriverAddResult(segmentIdentifier, numRowsInSegment, totalNumRowsInAppenderator);
}
public static AppenderatorDriverAddResult fail()
{
return new AppenderatorDriverAddResult(null, 0);
return new AppenderatorDriverAddResult(null, 0, 0);
}
private AppenderatorDriverAddResult(@Nullable SegmentIdentifier segmentIdentifier, int numRowsInSegment)
private AppenderatorDriverAddResult(
@Nullable SegmentIdentifier segmentIdentifier,
int numRowsInSegment,
long totalNumRowsInAppenderator
)
{
this.segmentIdentifier = segmentIdentifier;
this.numRowsInSegment = numRowsInSegment;
this.totalNumRowsInAppenderator = totalNumRowsInAppenderator;
}
public boolean isOk()
@ -63,4 +73,9 @@ public class AppenderatorDriverAddResult
{
return numRowsInSegment;
}
public long getTotalNumRowsInAppenderator()
{
return totalNumRowsInAppenderator;
}
}

View File

@ -25,7 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Map;
public class FiniteAppenderatorDriverMetadata
public class AppenderatorDriverMetadata
{
private final Map<String, List<SegmentIdentifier>> activeSegments;
private final Map<String, List<SegmentIdentifier>> publishPendingSegments;
@ -33,7 +33,7 @@ public class FiniteAppenderatorDriverMetadata
private final Object callerMetadata;
@JsonCreator
public FiniteAppenderatorDriverMetadata(
public AppenderatorDriverMetadata(
@JsonProperty("activeSegments") Map<String, List<SegmentIdentifier>> activeSegments,
@JsonProperty("publishPendingSegments") Map<String, List<SegmentIdentifier>> publishPendingSegments,
@JsonProperty("lastSegmentIds") Map<String, String> lastSegmentIds,
@ -73,7 +73,7 @@ public class FiniteAppenderatorDriverMetadata
@Override
public String toString()
{
return "FiniteAppenderatorDriverMetadata{" +
return "AppenderatorDriverMetadata{" +
"activeSegments=" + activeSegments +
", publishPendingSegments=" + publishPendingSegments +
", lastSegmentIds=" + lastSegmentIds +

View File

@ -48,6 +48,7 @@ import io.druid.data.input.InputRow;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -114,8 +115,11 @@ public class AppenderatorImpl implements Appenderator
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER
);
private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
private final QuerySegmentWalker texasRanger;
// This variable updated in add(), persist(), and drop()
private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
private final AtomicInteger totalRows = new AtomicInteger();
private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService pushExecutor = null;
@ -214,7 +218,9 @@ public class AppenderatorImpl implements Appenderator
throw new SegmentNotWritableException("Attempt to add row to swapped-out sink for segment[%s].", identifier);
}
rowsCurrentlyInMemory.addAndGet(sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd);
final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
rowsCurrentlyInMemory.addAndGet(numAddedRows);
totalRows.addAndGet(numAddedRows);
if (!sink.canAppendRow()
|| System.currentTimeMillis() > nextFlush
@ -244,6 +250,12 @@ public class AppenderatorImpl implements Appenderator
}
}
@Override
public int getTotalRowCount()
{
return totalRows.get();
}
@VisibleForTesting
int getRowsInMemory()
{
@ -352,10 +364,15 @@ public class AppenderatorImpl implements Appenderator
{
final Map<SegmentIdentifier, Integer> commitHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
int numPersistedRows = 0;
for (SegmentIdentifier identifier : identifiers) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
throw new ISE("No sink for identifier: %s", identifier);
}
final List<FireHydrant> hydrants = Lists.newArrayList(sink);
commitHydrants.put(identifier, hydrants.size());
numPersistedRows += sink.getNumRowsInMemory();
final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
@ -431,7 +448,7 @@ public class AppenderatorImpl implements Appenderator
resetNextFlush();
// NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
rowsCurrentlyInMemory.set(0);
rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
return future;
}
@ -453,7 +470,7 @@ public class AppenderatorImpl implements Appenderator
for (final SegmentIdentifier identifier : identifiers) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
throw new NullPointerException("No sink for identifier: " + identifier);
throw new ISE("No sink for identifier: %s", identifier);
}
theSinks.put(identifier, sink);
sink.finishWriting();
@ -576,9 +593,14 @@ public class AppenderatorImpl implements Appenderator
tuningConfig.getIndexSpec()
);
DataSegment segment = dataSegmentPusher.push(
// Retry pushing segments because uploading to deep storage might fail especially for cloud storage types
final DataSegment segment = RetryUtils.retry(
() -> dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
),
exception -> exception instanceof Exception,
5
);
objectMapper.writeValue(descriptorFile, segment);
@ -867,6 +889,7 @@ public class AppenderatorImpl implements Appenderator
// Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks).
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
totalRows.addAndGet(-sink.getNumRows());
// Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread.
return Futures.transform(

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.java.util.common.StringUtils;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
@ -31,6 +32,8 @@ import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
@ -122,4 +125,8 @@ public class Appenderators
);
}
public static String getSequenceName(Interval interval, String version, ShardSpec shardSpec)
{
return StringUtils.format("index_%s_%s_%d", interval, version, shardSpec.getPartitionNum());
}
}

View File

@ -19,7 +19,7 @@
package io.druid.segment.realtime.appenderator;
import org.joda.time.DateTime;
import io.druid.data.input.InputRow;
import java.io.IOException;
@ -28,14 +28,14 @@ public interface SegmentAllocator
/**
* Allocates a new segment for a given timestamp.
*
* @param timestamp timestamp of the event which triggered this allocation request
* @param row the event which triggered this allocation request
* @param sequenceName sequenceName for this allocation
* @param previousSegmentId segment identifier returned on the previous call to allocate for your sequenceName
*
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/
SegmentIdentifier allocate(
DateTime timestamp,
InputRow row,
String sequenceName,
String previousSegmentId
) throws IOException;

View File

@ -227,7 +227,7 @@ public class Sink implements Iterable<FireHydrant>
public int getNumRows()
{
synchronized (hydrantLock) {
return numRowsExcludingCurrIndex.get() + currHydrant.getIndex().size();
return numRowsExcludingCurrIndex.get() + getNumRowsInMemory();
}
}

View File

@ -29,6 +29,9 @@ import io.druid.data.input.InputRow;
import java.util.List;
import java.util.Map;
/**
* An extendable linear shard spec. {@link #partitionNum} represents an unique id of a partition.
*/
public class LinearShardSpec implements ShardSpec
{
private int partitionNum;

View File

@ -30,6 +30,13 @@ import io.druid.data.input.InputRow;
import java.util.List;
import java.util.Map;
/**
* An extendable linear shard spec containing the information of core partitions. This class contains two variables of
* {@link #partitionNum} and {@link #partitions}, which represent the unique id of a partition and the number of core
* partitions, respectively. {@link #partitions} simply indicates that the atomic update is regarded as completed when
* {@link #partitions} partitions are successfully updated, and {@link #partitionNum} can go beyond it when some types
* of index tasks are trying to append to existing partitions.
*/
public class NumberedShardSpec implements ShardSpec
{
@JsonIgnore

View File

@ -69,7 +69,7 @@ public class AppenderatorDriverFailTest
{
private static final String DATA_SOURCE = "foo";
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
private static final long PUBLISH_TIMEOUT = 1000;
private static final long PUBLISH_TIMEOUT = 5000;
private static final List<InputRow> ROWS = ImmutableList.of(
new MapBasedInputRow(
@ -115,7 +115,11 @@ public class AppenderatorDriverFailTest
@Test
public void testFailDuringPersist() throws IOException, InterruptedException, TimeoutException, ExecutionException
{
expectedException.expect(TimeoutException.class);
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage("Fail test while persisting segments"
+ "[[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123, "
+ "foo_2000-01-01T01:00:00.000Z_2000-01-01T02:00:00.000Z_abc123]]");
driver = new AppenderatorDriver(
createPersistFailAppenderator(),
@ -145,44 +149,14 @@ public class AppenderatorDriverFailTest
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
}
@Test
public void testInterruptDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException
{
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class));
driver = new AppenderatorDriver(
createPushInterruptAppenderator(),
allocator,
segmentHandoffNotifierFactory,
new NoopUsedSegmentChecker(),
OBJECT_MAPPER,
new FireDepartmentMetrics()
);
driver.startJob();
final TestCommitterSupplier<Integer> committerSupplier = new TestCommitterSupplier<>();
segmentHandoffNotifierFactory.setHandoffDelay(100);
Assert.assertNull(driver.startJob());
for (int i = 0; i < ROWS.size(); i++) {
committerSupplier.setMetadata(i + 1);
Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier).isOk());
}
driver.publish(
AppenderatorDriverTest.makeOkPublisher(),
committerSupplier.get(),
ImmutableList.of("dummy")
).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS);
}
@Test
public void testFailDuringPush() throws IOException, InterruptedException, TimeoutException, ExecutionException
{
expectedException.expect(TimeoutException.class);
expectedException.expect(ExecutionException.class);
expectedException.expectCause(CoreMatchers.instanceOf(ISE.class));
expectedException.expectMessage("Fail test while pushing segments"
+ "[[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123, "
+ "foo_2000-01-01T01:00:00.000Z_2000-01-01T02:00:00.000Z_abc123]]");
driver = new AppenderatorDriver(
createPushFailAppenderator(),
@ -281,7 +255,6 @@ public class AppenderatorDriverFailTest
}
private static class FailableAppenderator implements Appenderator
{
private final Map<SegmentIdentifier, List<InputRow>> rows = new HashMap<>();
@ -356,6 +329,12 @@ public class AppenderatorDriverFailTest
}
}
@Override
public int getTotalRowCount()
{
return numRows;
}
@Override
public void clear() throws InterruptedException
{

View File

@ -405,13 +405,13 @@ public class AppenderatorDriverTest
@Override
public SegmentIdentifier allocate(
final DateTime timestamp,
final InputRow row,
final String sequenceName,
final String previousSegmentId
) throws IOException
{
synchronized (counters) {
final long timestampTruncated = granularity.bucketStart(timestamp).getMillis();
final long timestampTruncated = granularity.bucketStart(row.getTimestamp()).getMillis();
if (!counters.containsKey(timestampTruncated)) {
counters.put(timestampTruncated, new AtomicInteger());
}

View File

@ -67,7 +67,7 @@ public class AppenderatorTest
@Test
public void testSimpleIngestion() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(2)) {
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;
@ -138,7 +138,7 @@ public class AppenderatorTest
@Test
public void testMaxRowsInMemory() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(3)) {
try (final AppenderatorTester tester = new AppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
@ -180,6 +180,8 @@ public class AppenderatorTest
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 1), committerSupplier);
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.persist(ImmutableList.of(IDENTIFIERS.get(1)), committerSupplier.get());
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
@ -189,7 +191,7 @@ public class AppenderatorTest
public void testRestoreFromDisk() throws Exception
{
final RealtimeTuningConfig tuningConfig;
try (final AppenderatorTester tester = new AppenderatorTester(2)) {
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator();
tuningConfig = tester.getTuningConfig();
@ -231,7 +233,7 @@ public class AppenderatorTest
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 5), committerSupplier);
appenderator.close();
try (final AppenderatorTester tester2 = new AppenderatorTester(2, tuningConfig.getBasePersistDirectory())) {
try (final AppenderatorTester tester2 = new AppenderatorTester(2, tuningConfig.getBasePersistDirectory(), true)) {
final Appenderator appenderator2 = tester2.getAppenderator();
Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob());
Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments());
@ -240,10 +242,52 @@ public class AppenderatorTest
}
}
@Test(timeout = 10000L)
public void testTotalRowCount() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(3, true)) {
final Appenderator appenderator = tester.getAppenderator();
final ConcurrentMap<String, String> commitMetadata = new ConcurrentHashMap<>();
final Supplier<Committer> committerSupplier = committerSupplierFromConcurrentMap(commitMetadata);
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.startJob();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.persistAll(committerSupplier.get()).get();
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(0)).get();
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(1)).get();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), IR("2001", "bar", 1), committerSupplier);
Assert.assertEquals(1, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), IR("2001", "baz", 1), committerSupplier);
Assert.assertEquals(2, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), IR("2001", "qux", 1), committerSupplier);
Assert.assertEquals(3, appenderator.getTotalRowCount());
appenderator.add(IDENTIFIERS.get(2), IR("2001", "bob", 1), committerSupplier);
Assert.assertEquals(4, appenderator.getTotalRowCount());
appenderator.persistAll(committerSupplier.get()).get();
Assert.assertEquals(4, appenderator.getTotalRowCount());
appenderator.drop(IDENTIFIERS.get(2)).get();
Assert.assertEquals(0, appenderator.getTotalRowCount());
appenderator.close();
Assert.assertEquals(0, appenderator.getTotalRowCount());
}
}
@Test
public void testQueryByIntervals() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(2)) {
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
@ -379,7 +423,7 @@ public class AppenderatorTest
@Test
public void testQueryBySegments() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(2)) {
try (final AppenderatorTester tester = new AppenderatorTester(2, true)) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();

View File

@ -89,12 +89,21 @@ public class AppenderatorTester implements AutoCloseable
final int maxRowsInMemory
)
{
this(maxRowsInMemory, null);
this(maxRowsInMemory, null, false);
}
public AppenderatorTester(
final int maxRowsInMemory,
final File basePersistDirectory
final boolean enablePushFailure
)
{
this(maxRowsInMemory, null, enablePushFailure);
}
public AppenderatorTester(
final int maxRowsInMemory,
final File basePersistDirectory,
final boolean enablePushFailure
)
{
objectMapper = new DefaultObjectMapper();
@ -169,6 +178,8 @@ public class AppenderatorTester implements AutoCloseable
EmittingLogger.registerEmitter(emitter);
dataSegmentPusher = new DataSegmentPusher()
{
private boolean mustFail = true;
@Deprecated
@Override
public String getPathForHadoop(String dataSource)
@ -185,6 +196,12 @@ public class AppenderatorTester implements AutoCloseable
@Override
public DataSegment push(File file, DataSegment segment) throws IOException
{
if (enablePushFailure && mustFail) {
mustFail = false;
throw new IOException("Push failure test");
} else if (enablePushFailure) {
mustFail = true;
}
pushedSegments.add(segment);
return segment;
}