From ff52581bd305c8b64d4f040cc37fa8310fc69ef3 Mon Sep 17 00:00:00 2001 From: David Lim Date: Wed, 18 Jan 2017 15:24:37 -0700 Subject: [PATCH] IndexTask improvements (#3611) * index task improvements * code review changes * add null check --- .../java/io/druid/data/input/Firehose.java | 2 +- docs/content/ingestion/index.md | 2 +- docs/content/ingestion/tasks.md | 22 +- .../druid/indexing/common/task/IndexTask.java | 851 +++++++++++------- .../overlord/TaskStorageQueryAdapter.java | 3 + .../indexing/common/task/IndexTaskTest.java | 485 +++++----- .../indexing/common/task/TaskSerdeTest.java | 187 +++- .../indexing/overlord/TaskLifecycleTest.java | 24 +- .../java/io/druid/guice/FirehoseModule.java | 2 + .../granularity/ArbitraryGranularitySpec.java | 5 + .../indexing/granularity/GranularitySpec.java | 10 +- .../granularity/UniformGranularitySpec.java | 5 + .../firehose/ReplayableFirehoseFactory.java | 319 +++++++ .../NoopSegmentHandoffNotifierFactory.java | 53 ++ .../ReplayableFirehoseFactoryTest.java | 447 +++++++++ 15 files changed, 1788 insertions(+), 629 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/NoopSegmentHandoffNotifierFactory.java create mode 100644 server/src/test/java/io/druid/realtime/firehose/ReplayableFirehoseFactoryTest.java diff --git a/api/src/main/java/io/druid/data/input/Firehose.java b/api/src/main/java/io/druid/data/input/Firehose.java index a3df7bc1295..a768e778d81 100644 --- a/api/src/main/java/io/druid/data/input/Firehose.java +++ b/api/src/main/java/io/druid/data/input/Firehose.java @@ -53,7 +53,7 @@ public interface Firehose extends Closeable * * @return The next row */ - public InputRow nextRow() ; + public InputRow nextRow(); /** * Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is diff --git a/docs/content/ingestion/index.md b/docs/content/ingestion/index.md index 07c7d7e036a..117e54009bd 100644 --- a/docs/content/ingestion/index.md +++ b/docs/content/ingestion/index.md @@ -187,7 +187,7 @@ This spec is used to generated segments with uniform intervals. | segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') | | queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') | | rollup | boolean | rollup or not | no (default == true) | -| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time | +| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for Hadoop ingestion, no otherwise | | timezone | string | The timezone to represent the interval offsets in. Only valid if intervals are explicitly specified for batch ingestion. Will not be valid for kafka based ingestion. | no (default == 'UTC') ### Arbitrary Granularity Spec diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 0885a15a276..64fa14679fc 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -76,9 +76,8 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed }, "tuningConfig" : { "type" : "index", - "targetPartitionSize" : -1, - "rowFlushBoundary" : 0, - "numShards": 1 + "targetPartitionSize" : 5000000, + "maxRowsInMemory" : 75000 } } } @@ -100,7 +99,12 @@ See [Ingestion](../ingestion/index.html) #### IOConfig -This field is required. You can specify a type of [Firehose](../ingestion/firehose.html) here. +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|The task type, this should always be "index".|none|yes| +|firehose|Specify a [Firehose](../ingestion/firehose.html) here.|none|yes| +|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs (which can be forced by setting 'forceExtendableShardSpecs' in the tuning config).|false|no| +|skipFirehoseCaching|By default the IndexTask will fully read the supplied firehose to disk before processing the data. This prevents the task from doing multiple remote fetches and enforces determinism if more than one pass through the data is required. It also allows the task to retry fetching the data if the firehose throws an exception during reading. This requires sufficient disk space for the temporary cache.|false|no| #### TuningConfig @@ -108,13 +112,15 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |property|description|default|required?| |--------|-----------|-------|---------| -|type|The task type, this should always be "index".|None.|yes| -|targetPartitionSize|Used in sharding. Determines how many rows are in each segment. Set this to -1 to use numShards instead for sharding.|5000000|no| -|rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|75000|no| -|numShards|Directly specify the number of shards to create. You can skip the intermediate persist step if you specify the number of shards you want and set targetPartitionSize=-1.|null|no| +|type|The task type, this should always be "index".|none|yes| +|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| +|maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no| +|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |buildV9Directly|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|true|no| |forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no| +|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| #### IndexSpec diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index b25fcac3564..a30a0acc8a4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -25,48 +25,72 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; - +import io.druid.common.utils.JodaUtils; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; import io.druid.granularity.QueryGranularity; +import io.druid.guice.annotations.Smile; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; +import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; -import io.druid.indexing.common.index.YeOldePlumberSchool; +import io.druid.indexing.common.actions.LockAcquireAction; +import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.query.DruidMetrics; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IOConfig; import io.druid.segment.indexing.IngestionSpec; -import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.TuningConfig; import io.druid.segment.indexing.granularity.GranularitySpec; -import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.RealtimeMetricsMonitor; +import io.druid.segment.realtime.appenderator.Appenderator; +import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; +import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory; import io.druid.segment.realtime.plumber.Committers; -import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.ShardSpec; +import io.druid.timeline.partition.ShardSpecLookup; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.Period; import javax.annotation.Nullable; import java.io.File; @@ -75,39 +99,15 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; -import java.util.concurrent.CopyOnWriteArrayList; -public class IndexTask extends AbstractFixedIntervalTask +public class IndexTask extends AbstractTask { private static final Logger log = new Logger(IndexTask.class); - - private static HashFunction hashFunction = Hashing.murmur3_128(); - - /** - * Should we index this inputRow? Decision is based on our interval and shardSpec. - * - * @param inputRow the row to check - * - * @return true or false - */ - private static boolean shouldIndex( - final ShardSpec shardSpec, - final Interval interval, - final InputRow inputRow, - final QueryGranularity rollupGran - ) - { - return interval.contains(inputRow.getTimestampFromEpoch()) - && shardSpec.isInChunk(rollupGran.truncate(inputRow.getTimestampFromEpoch()), inputRow); - } + private static final HashFunction hashFunction = Hashing.murmur3_128(); private static String makeId(String id, IndexIngestionSpec ingestionSchema) { - if (id == null) { - return String.format("index_%s_%s", makeDataSource(ingestionSchema), new DateTime().toString()); - } - - return id; + return id != null ? id : String.format("index_%s_%s", makeDataSource(ingestionSchema), new DateTime()); } private static String makeDataSource(IndexIngestionSpec ingestionSchema) @@ -115,67 +115,23 @@ public class IndexTask extends AbstractFixedIntervalTask return ingestionSchema.getDataSchema().getDataSource(); } - private static Interval makeInterval(IndexIngestionSpec ingestionSchema) - { - GranularitySpec spec = ingestionSchema.getDataSchema().getGranularitySpec(); - - return new Interval( - spec.bucketIntervals().get().first().getStart(), - spec.bucketIntervals().get().last().getEnd() - ); - } - - static RealtimeTuningConfig convertTuningConfig( - ShardSpec shardSpec, - int rowFlushBoundary, - IndexSpec indexSpec, - boolean buildV9Directly - ) - { - return new RealtimeTuningConfig( - rowFlushBoundary, - null, - null, - null, - null, - null, - null, - shardSpec, - indexSpec, - buildV9Directly, - 0, - 0, - true, - null - ); - } - @JsonIgnore private final IndexIngestionSpec ingestionSchema; - - private final ObjectMapper jsonMapper; + private final ObjectMapper smileMapper; @JsonCreator public IndexTask( - @JsonProperty("id") String id, - @JsonProperty("resource") TaskResource taskResource, - @JsonProperty("spec") IndexIngestionSpec ingestionSchema, - @JacksonInject ObjectMapper jsonMapper, - @JsonProperty("context") Map context + @JsonProperty("id") final String id, + @JsonProperty("resource") final TaskResource taskResource, + @JsonProperty("spec") final IndexIngestionSpec ingestionSchema, + @JsonProperty("context") final Map context, + @Smile @JacksonInject final ObjectMapper smileMapper ) { - super( - // _not_ the version, just something uniqueish - makeId(id, ingestionSchema), - taskResource, - makeDataSource(ingestionSchema), - makeInterval(ingestionSchema), - context - ); - + super(makeId(id, ingestionSchema), null, taskResource, makeDataSource(ingestionSchema), context); this.ingestionSchema = ingestionSchema; - this.jsonMapper = jsonMapper; + this.smileMapper = smileMapper; } @Override @@ -184,6 +140,19 @@ public class IndexTask extends AbstractFixedIntervalTask return "index"; } + @Override + public boolean isReady(TaskActionClient taskActionClient) throws Exception + { + Optional> intervals = ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals(); + + if (intervals.isPresent()) { + Interval interval = JodaUtils.umbrellaInterval(intervals.get()); + return taskActionClient.submit(new LockTryAcquireAction(interval)) != null; + } else { + return true; + } + } + @JsonProperty("spec") public IndexIngestionSpec getIngestionSchema() { @@ -191,268 +160,362 @@ public class IndexTask extends AbstractFixedIntervalTask } @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception + public TaskStatus run(final TaskToolbox toolbox) throws Exception { - final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); - final int targetPartitionSize = ingestionSchema.getTuningConfig().getTargetPartitionSize(); + final boolean determineIntervals = !ingestionSchema.getDataSchema() + .getGranularitySpec() + .bucketIntervals() + .isPresent(); - final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); - final Set segments = Sets.newHashSet(); - - final Set validIntervals = Sets.intersection(granularitySpec.bucketIntervals().get(), getDataIntervals()); - if (validIntervals.isEmpty()) { - throw new ISE("No valid data intervals found. Check your configs!"); + final FirehoseFactory delegateFirehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); + final FirehoseFactory firehoseFactory; + if (ingestionSchema.getIOConfig().isSkipFirehoseCaching() + || delegateFirehoseFactory instanceof ReplayableFirehoseFactory) { + firehoseFactory = delegateFirehoseFactory; + } else { + firehoseFactory = new ReplayableFirehoseFactory( + delegateFirehoseFactory, + ingestionSchema.getTuningConfig().isReportParseExceptions(), + null, + null, + smileMapper + ); } - for (final Interval bucket : validIntervals) { - final List shardSpecs; - if (targetPartitionSize > 0) { - shardSpecs = determinePartitions(bucket, targetPartitionSize, granularitySpec.getQueryGranularity()); - } else { - int numShards = ingestionSchema.getTuningConfig().getNumShards(); - if (numShards > 0) { - shardSpecs = Lists.newArrayList(); - for (int i = 0; i < numShards; i++) { - shardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper)); - } - } else { - shardSpecs = ImmutableList.of(NoneShardSpec.instance()); - } - } - for (final ShardSpec shardSpec : shardSpecs) { - // ShardSpec to be published. - final ShardSpec shardSpecForPublishing; - if (ingestionSchema.getTuningConfig().isForceExtendableShardSpecs()) { - shardSpecForPublishing = new NumberedShardSpec( - shardSpec.getPartitionNum(), - shardSpecs.size() - ); - } else { - shardSpecForPublishing = shardSpec; - } + final Map> shardSpecs = determineShardSpecs(toolbox, firehoseFactory); - final DataSegment segment = generateSegment( - toolbox, - ingestionSchema.getDataSchema(), - shardSpec, - shardSpecForPublishing, - bucket, - myLock.getVersion() - ); - segments.add(segment); - } + final String version; + final DataSchema dataSchema; + if (determineIntervals) { + Interval interval = JodaUtils.umbrellaInterval(shardSpecs.keySet()); + TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval)); + version = lock.getVersion(); + dataSchema = ingestionSchema.getDataSchema().withGranularitySpec( + ingestionSchema.getDataSchema() + .getGranularitySpec() + .withIntervals( + JodaUtils.condenseIntervals( + shardSpecs.keySet() + ) + ) + ); + } else { + version = Iterables.getOnlyElement(getTaskLocks(toolbox)).getVersion(); + dataSchema = ingestionSchema.getDataSchema(); + } + + if (generateAndPublishSegments(toolbox, dataSchema, shardSpecs, version, firehoseFactory)) { + return TaskStatus.success(getId()); + } else { + return TaskStatus.failure(getId()); } - toolbox.publishSegments(segments); - return TaskStatus.success(getId()); } - private SortedSet getDataIntervals() throws IOException - { - final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); - - SortedSet retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); - int unparsed = 0; - try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { - while (firehose.hasMore()) { - final InputRow inputRow = firehose.nextRow(); - DateTime dt = new DateTime(inputRow.getTimestampFromEpoch()); - Optional interval = granularitySpec.bucketInterval(dt); - if (interval.isPresent()) { - retVal.add(interval.get()); - } else { - unparsed++; - } - } - } - if (unparsed > 0) { - log.warn("Unable to to find a matching interval for [%,d] events", unparsed); - } - - return retVal; - } - - private List determinePartitions( - final Interval interval, - final int targetPartitionSize, - final QueryGranularity queryGranularity + /** + * Determines the number of shards for each interval using a hash of queryGranularity timestamp + all dimensions (i.e + * hash-based partitioning). In the future we may want to also support single-dimension partitioning. + */ + private Map> determineShardSpecs( + final TaskToolbox toolbox, + final FirehoseFactory firehoseFactory ) throws IOException { - log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize); + final ObjectMapper jsonMapper = toolbox.getObjectMapper(); + final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); + final QueryGranularity queryGranularity = granularitySpec.getQueryGranularity(); + final boolean determineNumPartitions = ingestionSchema.getTuningConfig().getNumShards() == null; + final boolean determineIntervals = !ingestionSchema.getDataSchema() + .getGranularitySpec() + .bucketIntervals() + .isPresent(); - final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); + final Map> shardSpecs = Maps.newHashMap(); - // The implementation of this determine partitions stuff is less than optimal. Should be done better. - // Use HLL to estimate number of rows - HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); + // if we were given number of shards per interval and the intervals, we don't need to scan the data + if (!determineNumPartitions && !determineIntervals) { + log.info("numShards and intervals provided, skipping determine partition scan"); + final SortedSet intervals = ingestionSchema.getDataSchema() + .getGranularitySpec() + .bucketIntervals() + .get(); + final int numShards = ingestionSchema.getTuningConfig().getNumShards(); - // Load data - try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { + for (Interval interval : intervals) { + final List intervalShardSpecs = Lists.newArrayListWithCapacity(numShards); + if (numShards > 1) { + for (int i = 0; i < numShards; i++) { + intervalShardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper)); + } + } else { + intervalShardSpecs.add(NoneShardSpec.instance()); + } + shardSpecs.put(interval, intervalShardSpecs); + } + + return shardSpecs; + } + + // determine intervals containing data and prime HLL collectors + final Map> hllCollectors = Maps.newHashMap(); + int thrownAway = 0; + + log.info("Determining intervals and shardSpecs"); + long determineShardSpecsStartMillis = System.currentTimeMillis(); + try (final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); - if (interval.contains(inputRow.getTimestampFromEpoch())) { - final List groupKey = Rows.toGroupKey( - queryGranularity.truncate(inputRow.getTimestampFromEpoch()), - inputRow - ); - collector.add( - hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes() - ); + + final Interval interval; + if (determineIntervals) { + interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); + } else { + final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); + if (!optInterval.isPresent()) { + thrownAway++; + continue; + } + interval = optInterval.get(); } + + if (!determineNumPartitions) { + // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() + // for the interval and don't instantiate a HLL collector + if (!hllCollectors.containsKey(interval)) { + hllCollectors.put(interval, Optional.absent()); + } + continue; + } + + if (!hllCollectors.containsKey(interval)) { + hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); + } + + List groupKey = Rows.toGroupKey(queryGranularity.truncate(inputRow.getTimestampFromEpoch()), inputRow); + hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); } } - final double numRows = collector.estimateCardinality(); - log.info("Estimated approximately [%,f] rows of data.", numRows); - - int numberOfShards = (int) Math.ceil(numRows / targetPartitionSize); - if ((double) numberOfShards > numRows) { - numberOfShards = (int) numRows; + if (thrownAway > 0) { + log.warn("Unable to to find a matching interval for [%,d] events", thrownAway); } - log.info("Will require [%,d] shard(s).", numberOfShards); - // ShardSpecs we will return - final List shardSpecs = Lists.newArrayList(); + final ImmutableSortedMap> sortedMap = ImmutableSortedMap.copyOf( + hllCollectors, + Comparators.intervalsByStartThenEnd() + ); - if (numberOfShards == 1) { - shardSpecs.add(NoneShardSpec.instance()); - } else { - for (int i = 0; i < numberOfShards; ++i) { - shardSpecs.add(new HashBasedNumberedShardSpec(i, numberOfShards, null, jsonMapper)); + for (final Map.Entry> entry : sortedMap.entrySet()) { + final Interval interval = entry.getKey(); + final Optional collector = entry.getValue(); + + final int numShards; + if (determineNumPartitions) { + final long numRows = new Double(collector.get().estimateCardinality()).longValue(); + numShards = (int) Math.ceil((double) numRows / ingestionSchema.getTuningConfig().getTargetPartitionSize()); + log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards); + } else { + numShards = ingestionSchema.getTuningConfig().getNumShards(); + log.info("Creating [%,d] shards for interval [%s]", numShards, interval); } + + final List intervalShardSpecs = Lists.newArrayListWithCapacity(numShards); + if (numShards > 1) { + for (int i = 0; i < numShards; i++) { + intervalShardSpecs.add(new HashBasedNumberedShardSpec(i, numShards, null, jsonMapper)); + } + } else { + intervalShardSpecs.add(NoneShardSpec.instance()); + } + shardSpecs.put(interval, intervalShardSpecs); } + log.info("Found intervals and shardSpecs in %,dms", System.currentTimeMillis() - determineShardSpecsStartMillis); return shardSpecs; } - private DataSegment generateSegment( + private boolean generateAndPublishSegments( final TaskToolbox toolbox, - final DataSchema schema, - final ShardSpec shardSpecForPartitioning, - final ShardSpec shardSpecForPublishing, - final Interval interval, - final String version - ) throws IOException + final DataSchema dataSchema, + final Map> shardSpecs, + final String version, + final FirehoseFactory firehoseFactory + ) throws IOException, InterruptedException + { - // Set up temporary directory. - final File tmpDir = new File( - toolbox.getTaskWorkDir(), - String.format( - "%s_%s_%s_%s_%s", - this.getDataSource(), - interval.getStart(), - interval.getEnd(), - version, - shardSpecForPartitioning.getPartitionNum() - ) + final GranularitySpec granularitySpec = dataSchema.getGranularitySpec(); + final FireDepartment fireDepartmentForMetrics = new FireDepartment( + dataSchema, new RealtimeIOConfig(null, null, null), null ); + final FireDepartmentMetrics fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); + final Map sequenceNameToShardSpecMap = Maps.newHashMap(); - final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory(); - final int rowFlushBoundary = ingestionSchema.getTuningConfig().getRowFlushBoundary(); - - // We need to track published segments. - final List pushedSegments = new CopyOnWriteArrayList(); - final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher() - { - @Deprecated - @Override - public String getPathForHadoop(String dataSource) - { - return getPathForHadoop(); - } - - @Override - public String getPathForHadoop() - { - return toolbox.getSegmentPusher().getPathForHadoop(); - } - - @Override - public DataSegment push(File file, DataSegment segment) throws IOException - { - final DataSegment pushedSegment = toolbox.getSegmentPusher().push(file, segment); - pushedSegments.add(pushedSegment); - return pushedSegment; - } - }; - - // rowFlushBoundary for this job - final int myRowFlushBoundary = rowFlushBoundary > 0 - ? rowFlushBoundary - : toolbox.getConfig().getDefaultRowFlushBoundary(); - - // Create firehose + plumber - final FireDepartmentMetrics metrics = new FireDepartmentMetrics(); - final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser()); - final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); - final Plumber plumber = new YeOldePlumberSchool( - interval, - version, - wrappedDataSegmentPusher, - tmpDir, - toolbox.getIndexMerger(), - toolbox.getIndexMergerV9(), - toolbox.getIndexIO() - ).findPlumber( - schema, - convertTuningConfig( - shardSpecForPublishing, - myRowFlushBoundary, - ingestionSchema.getTuningConfig().getIndexSpec(), - ingestionSchema.tuningConfig.getBuildV9Directly() - ), - metrics - ); - - final QueryGranularity rollupGran = ingestionSchema.getDataSchema().getGranularitySpec().getQueryGranularity(); - try { - plumber.startJob(); - - while (firehose.hasMore()) { - final InputRow inputRow = firehose.nextRow(); - - if (shouldIndex(shardSpecForPartitioning, interval, inputRow, rollupGran)) { - int numRows = plumber.add(inputRow, committerSupplier); - if (numRows == -1) { - throw new ISE( - String.format( - "Was expecting non-null sink for timestamp[%s]", - new DateTime(inputRow.getTimestampFromEpoch()) - ) - ); - } - metrics.incrementProcessed(); - } else { - metrics.incrementThrownAway(); - } - } - } - finally { - firehose.close(); - } - - plumber.persist(committerSupplier.get()); - - try { - plumber.finishJob(); - } - finally { - log.info( - "Task[%s] interval[%s] partition[%d] took in %,d rows (%,d processed, %,d unparseable, %,d thrown away)" - + " and output %,d rows", - getId(), - interval, - shardSpecForPartitioning.getPartitionNum(), - metrics.processed() + metrics.unparseable() + metrics.thrownAway(), - metrics.processed(), - metrics.unparseable(), - metrics.thrownAway(), - metrics.rowOutput() + if (toolbox.getMonitorScheduler() != null) { + toolbox.getMonitorScheduler().addMonitor( + new RealtimeMetricsMonitor( + ImmutableList.of(fireDepartmentForMetrics), + ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()}) + ) ); } - // We expect a single segment to have been created. - return Iterables.getOnlyElement(pushedSegments); + final SegmentAllocator segmentAllocator; + if (ingestionSchema.getIOConfig().isAppendToExisting()) { + segmentAllocator = new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema); + } else { + segmentAllocator = new SegmentAllocator() + { + @Override + public SegmentIdentifier allocate(DateTime timestamp, String sequenceName, String previousSegmentId) + throws IOException + { + Optional interval = granularitySpec.bucketInterval(timestamp); + if (!interval.isPresent()) { + throw new ISE("Could not find interval for timestamp [%s]", timestamp); + } + + ShardSpec shardSpec = sequenceNameToShardSpecMap.get(sequenceName); + if (shardSpec == null) { + throw new ISE("Could not find ShardSpec for sequenceName [%s]", sequenceName); + } + + return new SegmentIdentifier(getDataSource(), interval.get(), version, shardSpec); + } + }; + } + + try ( + final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema); + final FiniteAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator); + final Firehose firehose = firehoseFactory.connect(dataSchema.getParser()) + ) { + final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); + final Map shardSpecLookups = Maps.newHashMap(); + + if (driver.startJob() != null) { + driver.clear(); + } + + try { + while (firehose.hasMore()) { + try { + final InputRow inputRow = firehose.nextRow(); + + final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); + if (!optInterval.isPresent()) { + fireDepartmentMetrics.incrementThrownAway(); + continue; + } + + final Interval interval = optInterval.get(); + if (!shardSpecLookups.containsKey(interval)) { + final List intervalShardSpecs = shardSpecs.get(interval); + if (intervalShardSpecs == null || intervalShardSpecs.isEmpty()) { + throw new ISE("Failed to get shardSpec for interval[%s]", interval); + } + shardSpecLookups.put(interval, intervalShardSpecs.get(0).getLookup(intervalShardSpecs)); + } + + final ShardSpec shardSpec = shardSpecLookups.get(interval) + .getShardSpec(inputRow.getTimestampFromEpoch(), inputRow); + + final String sequenceName = String.format("index_%s_%s_%d", interval, version, shardSpec.getPartitionNum()); + + if (!sequenceNameToShardSpecMap.containsKey(sequenceName)) { + final ShardSpec shardSpecForPublishing = ingestionSchema.getTuningConfig().isForceExtendableShardSpecs() + || ingestionSchema.getIOConfig().isAppendToExisting() + ? new NumberedShardSpec( + shardSpec.getPartitionNum(), + shardSpecs.get(interval).size() + ) + : shardSpec; + + sequenceNameToShardSpecMap.put(sequenceName, shardSpecForPublishing); + } + + final SegmentIdentifier identifier = driver.add(inputRow, sequenceName, committerSupplier); + + if (identifier == null) { + throw new ISE("Could not allocate segment for row with timestamp[%s]", inputRow.getTimestamp()); + } + + fireDepartmentMetrics.incrementProcessed(); + } + catch (ParseException e) { + if (ingestionSchema.getTuningConfig().isReportParseExceptions()) { + throw e; + } else { + fireDepartmentMetrics.incrementUnparseable(); + } + } + } + } + finally { + driver.persist(committerSupplier.get()); + } + + final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() + { + @Override + public boolean publishSegments(Set segments, Object commitMetadata) throws IOException + { + final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(segments, null, null); + return toolbox.getTaskActionClient().submit(action).isSuccess(); + } + }; + + final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get()); + if (published == null) { + log.error("Failed to publish segments, aborting!"); + return false; + } else { + log.info( + "Published segments[%s]", Joiner.on(", ").join( + Iterables.transform( + published.getSegments(), + new Function() + { + @Override + public String apply(DataSegment input) + { + return input.getIdentifier(); + } + } + ) + ) + ); + return true; + } + } + } + + private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox, DataSchema dataSchema) + { + return Appenderators.createOffline( + dataSchema, + ingestionSchema.getTuningConfig().withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")), + metrics, + toolbox.getSegmentPusher(), + toolbox.getObjectMapper(), + toolbox.getIndexIO(), + ingestionSchema.getTuningConfig().isBuildV9Directly() ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger() + ); + } + + private FiniteAppenderatorDriver newDriver( + final Appenderator appenderator, + final TaskToolbox toolbox, + final SegmentAllocator segmentAllocator + ) + { + return new FiniteAppenderatorDriver( + appenderator, + segmentAllocator, + new NoopSegmentHandoffNotifierFactory(), // don't wait for handoff since we don't serve queries + new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), + toolbox.getObjectMapper(), + Integer.MAX_VALUE, // rows for a partition is already determined by the shardSpec + 0 + ); } public static class IndexIngestionSpec extends IngestionSpec @@ -472,7 +535,10 @@ public class IndexTask extends AbstractFixedIntervalTask this.dataSchema = dataSchema; this.ioConfig = ioConfig; - this.tuningConfig = tuningConfig == null ? new IndexTuningConfig(0, 0, null, null, null, false) : tuningConfig; + this.tuningConfig = tuningConfig == null + ? + new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null) + : tuningConfig; } @Override @@ -500,14 +566,23 @@ public class IndexTask extends AbstractFixedIntervalTask @JsonTypeName("index") public static class IndexIOConfig implements IOConfig { + private static final boolean DEFAULT_APPEND_TO_EXISTING = false; + private static final boolean DEFAULT_SKIP_FIREHOSE_CACHING = false; + private final FirehoseFactory firehoseFactory; + private final boolean appendToExisting; + private final boolean skipFirehoseCaching; @JsonCreator public IndexIOConfig( - @JsonProperty("firehose") FirehoseFactory firehoseFactory + @JsonProperty("firehose") FirehoseFactory firehoseFactory, + @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting, + @JsonProperty("skipFirehoseCaching") @Nullable Boolean skipFirehoseCaching ) { this.firehoseFactory = firehoseFactory; + this.appendToExisting = appendToExisting == null ? DEFAULT_APPEND_TO_EXISTING : appendToExisting; + this.skipFirehoseCaching = skipFirehoseCaching == null ? DEFAULT_SKIP_FIREHOSE_CACHING : skipFirehoseCaching; } @JsonProperty("firehose") @@ -515,80 +590,178 @@ public class IndexTask extends AbstractFixedIntervalTask { return firehoseFactory; } + + @JsonProperty("appendToExisting") + public boolean isAppendToExisting() + { + return appendToExisting; + } + + @JsonProperty("skipFirehoseCaching") + public boolean isSkipFirehoseCaching() + { + return skipFirehoseCaching; + } } @JsonTypeName("index") - public static class IndexTuningConfig implements TuningConfig + public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig { private static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; - private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000; + private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75000; private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); - private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.TRUE; + private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; + private static final boolean DEFAULT_BUILD_V9_DIRECTLY = true; + private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false; + private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; - private final int targetPartitionSize; - private final int rowFlushBoundary; - private final int numShards; + private final Integer targetPartitionSize; + private final int maxRowsInMemory; + private final Integer numShards; private final IndexSpec indexSpec; - private final Boolean buildV9Directly; + private final File basePersistDirectory; + private final int maxPendingPersists; + private final boolean buildV9Directly; private final boolean forceExtendableShardSpecs; + private final boolean reportParseExceptions; @JsonCreator public IndexTuningConfig( - @JsonProperty("targetPartitionSize") int targetPartitionSize, - @JsonProperty("rowFlushBoundary") int rowFlushBoundary, + @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, - @JsonProperty("buildV9Directly") Boolean buildV9Directly, - @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, + @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs, + @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions ) { - this.targetPartitionSize = targetPartitionSize == 0 ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize; - Preconditions.checkArgument(rowFlushBoundary >= 0, "rowFlushBoundary should be positive or zero"); - this.rowFlushBoundary = rowFlushBoundary == 0 ? DEFAULT_ROW_FLUSH_BOUNDARY : rowFlushBoundary; - this.numShards = numShards == null ? -1 : numShards; - this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; - Preconditions.checkArgument( - this.targetPartitionSize == -1 || this.numShards == -1, - "targetPartitionsSize and shardCount both cannot be set" + this( + targetPartitionSize, + maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, + numShards, + indexSpec, + maxPendingPersists, + buildV9Directly, + forceExtendableShardSpecs, + reportParseExceptions, + null ); + } + + private IndexTuningConfig( + @Nullable Integer targetPartitionSize, + @Nullable Integer maxRowsInMemory, + @Nullable Integer numShards, + @Nullable IndexSpec indexSpec, + @Nullable Integer maxPendingPersists, + @Nullable Boolean buildV9Directly, + @Nullable Boolean forceExtendableShardSpecs, + @Nullable Boolean reportParseExceptions, + @Nullable File basePersistDirectory + ) + { + Preconditions.checkArgument( + targetPartitionSize == null || targetPartitionSize == -1 || numShards == null, + "targetPartitionSize and numShards cannot both be set" + ); + + this.targetPartitionSize = numShards != null + ? null + : (targetPartitionSize == null ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize); + this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; + this.numShards = numShards; + this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly; - this.forceExtendableShardSpecs = forceExtendableShardSpecs; + this.forceExtendableShardSpecs = forceExtendableShardSpecs == null + ? DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS + : forceExtendableShardSpecs; + this.reportParseExceptions = reportParseExceptions == null + ? DEFAULT_REPORT_PARSE_EXCEPTIONS + : reportParseExceptions; + this.basePersistDirectory = basePersistDirectory; + } + + public IndexTuningConfig withBasePersistDirectory(File dir) + { + return new IndexTuningConfig( + targetPartitionSize, + maxRowsInMemory, + numShards, + indexSpec, + maxPendingPersists, + buildV9Directly, + forceExtendableShardSpecs, + reportParseExceptions, + dir + ); } @JsonProperty - public int getTargetPartitionSize() + public Integer getTargetPartitionSize() { return targetPartitionSize; } @JsonProperty - public int getRowFlushBoundary() + @Override + public int getMaxRowsInMemory() { - return rowFlushBoundary; + return maxRowsInMemory; } @JsonProperty - public int getNumShards() + public Integer getNumShards() { return numShards; } @JsonProperty + @Override public IndexSpec getIndexSpec() { return indexSpec; } + @Override + public File getBasePersistDirectory() + { + return basePersistDirectory; + } + @JsonProperty - public Boolean getBuildV9Directly() + @Override + public int getMaxPendingPersists() + { + return maxPendingPersists; + } + + @JsonProperty + public boolean isBuildV9Directly() { return buildV9Directly; } + @JsonProperty + @Override + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + @JsonProperty public boolean isForceExtendableShardSpecs() { return forceExtendableShardSpecs; } + + @Override + public Period getIntermediatePersistPeriod() + { + return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs + } } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java index f8d649e5e36..29412ce961b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -24,6 +24,7 @@ import com.google.common.collect.Sets; import com.google.inject.Inject; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.SegmentInsertAction; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.task.Task; import io.druid.timeline.DataSegment; @@ -80,6 +81,8 @@ public class TaskStorageQueryAdapter for (final TaskAction action : storage.getAuditLogs(taskid)) { if (action instanceof SegmentInsertAction) { segments.addAll(((SegmentInsertAction) action).getSegments()); + } else if (action instanceof SegmentTransactionalInsertAction) { + segments.addAll(((SegmentTransactionalInsertAction) action).getSegments()); } } return segments; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 09d1be815e1..6ce78e6db8d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -21,7 +21,6 @@ package io.druid.indexing.common.task; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; - import io.druid.data.input.impl.CSVParseSpec; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.SpatialDimensionSchema; @@ -31,9 +30,13 @@ import io.druid.granularity.QueryGranularities; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockListAction; +import io.druid.indexing.common.actions.SegmentAllocateAction; +import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.java.util.common.Granularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -42,13 +45,15 @@ import io.druid.segment.IndexMerger; import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; +import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.ShardSpec; import org.joda.time.DateTime; @@ -75,6 +80,7 @@ public class IndexTaskTest private IndexMerger indexMerger; private IndexMergerV9 indexMergerV9; private IndexIO indexIO; + private volatile int segmentAllocatePartitionCounter; public IndexTaskTest() { @@ -102,57 +108,9 @@ public class IndexTaskTest IndexTask indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( - new DataSchema( - "test", - jsonMapper.convertValue( - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ), - null - ), - Map.class - ), - new AggregatorFactory[]{ - new LongSumAggregatorFactory("val", "val") - }, - new UniformGranularitySpec( - Granularity.DAY, - QueryGranularities.MINUTE, - Arrays.asList(new Interval("2014/2015")) - ), - jsonMapper - ), - new IndexTask.IndexIOConfig( - new LocalFirehoseFactory( - tmpDir, - "druid*", - null - ) - ), - new IndexTask.IndexTuningConfig( - 2, - 0, - null, - indexSpec, - null, - false - ) - ), - jsonMapper, - null + createIngestionSpec(tmpDir, null, 2, null, false, false), + null, + jsonMapper ); final List segments = runTask(indexTask); @@ -188,57 +146,9 @@ public class IndexTaskTest IndexTask indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( - new DataSchema( - "test", - jsonMapper.convertValue( - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ), - null - ), - Map.class - ), - new AggregatorFactory[]{ - new LongSumAggregatorFactory("val", "val") - }, - new UniformGranularitySpec( - Granularity.DAY, - QueryGranularities.MINUTE, - Arrays.asList(new Interval("2014/2015")) - ), - jsonMapper - ), - new IndexTask.IndexIOConfig( - new LocalFirehoseFactory( - tmpDir, - "druid*", - null - ) - ), - new IndexTask.IndexTuningConfig( - 2, - 0, - null, - indexSpec, - null, - true - ) - ), - jsonMapper, - null + createIngestionSpec(tmpDir, null, 2, null, true, false), + null, + jsonMapper ); final List segments = runTask(indexTask); @@ -274,49 +184,19 @@ public class IndexTaskTest IndexTask indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( - new DataSchema( - "test", - jsonMapper.convertValue( - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") - ), - null - ), - Map.class - ), - new AggregatorFactory[]{ - new LongSumAggregatorFactory("val", "val") - }, - new ArbitraryGranularitySpec( - QueryGranularities.MINUTE, - Arrays.asList(new Interval("2014/2015")) - ), - jsonMapper + createIngestionSpec( + tmpDir, + new ArbitraryGranularitySpec( + QueryGranularities.MINUTE, + Arrays.asList(new Interval("2014/2015")) ), - new IndexTask.IndexIOConfig( - new LocalFirehoseFactory( - tmpDir, - "druid*", - null - ) - ), - null + 10, + null, + false, + false ), - jsonMapper, - null + null, + jsonMapper ); List segments = runTask(indexTask); @@ -324,6 +204,160 @@ public class IndexTaskTest Assert.assertEquals(1, segments.size()); } + @Test + public void testIntervalBucketing() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("2015-03-01T07:59:59.977Z,a,1"); + writer.println("2015-03-01T08:00:00.000Z,b,1"); + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new UniformGranularitySpec( + Granularity.HOUR, + QueryGranularities.HOUR, + Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z")) + ), + 50, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + } + + @Test + public void testNumShardsProvided() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("2014-01-01T00:00:10Z,a,1"); + writer.println("2014-01-01T01:00:20Z,b,1"); + writer.println("2014-01-01T02:00:30Z,c,1"); + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec(tmpDir, null, null, 1, false, false), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + + Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class)); + Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); + } + + @Test + public void testAppendToExisting() throws Exception + { + segmentAllocatePartitionCounter = 0; + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("2014-01-01T00:00:10Z,a,1"); + writer.println("2014-01-01T01:00:20Z,b,1"); + writer.println("2014-01-01T02:00:30Z,c,1"); + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec(tmpDir, null, 2, null, false, true), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(2, segmentAllocatePartitionCounter); + Assert.assertEquals(2, segments.size()); + + Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class)); + Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); + + Assert.assertEquals("test", segments.get(1).getDataSource()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(1).getInterval()); + Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class)); + Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); + } + + @Test + public void testIntervalNotSpecified() throws Exception + { + File tmpDir = temporaryFolder.newFolder(); + File tmpFile = File.createTempFile("druid", "index", tmpDir); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("2014-01-01T00:00:10Z,a,1"); + writer.println("2014-01-01T01:00:20Z,b,1"); + writer.println("2014-01-01T02:00:30Z,c,1"); + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + null, + createIngestionSpec( + tmpDir, + new UniformGranularitySpec( + Granularity.HOUR, + QueryGranularities.MINUTE, + null + ), + 2, + null, + false, + false + ), + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(3, segments.size()); + + Assert.assertEquals("test", segments.get(0).getDataSource()); + Assert.assertEquals(new Interval("2014-01-01T00/PT1H"), segments.get(0).getInterval()); + Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class)); + Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); + + Assert.assertEquals("test", segments.get(1).getDataSource()); + Assert.assertEquals(new Interval("2014-01-01T01/PT1H"), segments.get(1).getInterval()); + Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NoneShardSpec.class)); + Assert.assertEquals(0, segments.get(1).getShardSpec().getPartitionNum()); + + Assert.assertEquals("test", segments.get(2).getDataSource()); + Assert.assertEquals(new Interval("2014-01-01T02/PT1H"), segments.get(2).getInterval()); + Assert.assertTrue(segments.get(2).getShardSpec().getClass().equals(NoneShardSpec.class)); + Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum()); + } + private final List runTask(final IndexTask indexTask) throws Exception { final List segments = Lists.newArrayList(); @@ -342,6 +376,30 @@ public class IndexTaskTest ) ); } + + if (taskAction instanceof LockAcquireAction) { + return (RetType) new TaskLock( + "groupId", + "test", + ((LockAcquireAction) taskAction).getInterval(), + new DateTime().toString() + ); + } + + if (taskAction instanceof SegmentTransactionalInsertAction) { + return (RetType) new SegmentPublishResult( + ((SegmentTransactionalInsertAction) taskAction).getSegments(), + true + ); + } + + if (taskAction instanceof SegmentAllocateAction) { + SegmentAllocateAction action = (SegmentAllocateAction) taskAction; + Interval interval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp()); + ShardSpec shardSpec = new NumberedShardSpec(segmentAllocatePartitionCounter++, 0); + return (RetType) new SegmentIdentifier(action.getDataSource(), interval, "latestVersion", shardSpec); + } + return null; } }, null, new DataSegmentPusher() @@ -365,7 +423,7 @@ public class IndexTaskTest segments.add(segment); return segment; } - }, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(), + }, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(), indexMerger, indexIO, null, null, indexMergerV9 ) ); @@ -375,93 +433,66 @@ public class IndexTaskTest return segments; } - @Test - public void testIntervalBucketing() throws Exception + private IndexTask.IndexIngestionSpec createIngestionSpec( + File baseDir, + GranularitySpec granularitySpec, + Integer targetPartitionSize, + Integer numShards, + boolean forceExtendableShardSpecs, + boolean appendToExisting + ) { - File tmpDir = temporaryFolder.newFolder(); - - File tmpFile = File.createTempFile("druid", "index", tmpDir); - - PrintWriter writer = new PrintWriter(tmpFile); - writer.println("2015-03-01T07:59:59.977Z,a,1"); - writer.println("2015-03-01T08:00:00.000Z,b,1"); - writer.close(); - - IndexTask indexTask = new IndexTask( - null, - null, - new IndexTask.IndexIngestionSpec( - new DataSchema( - "test", - jsonMapper.convertValue( - new StringInputRowParser( - new CSVParseSpec( - new TimestampSpec( - "ts", - "auto", - null - ), - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("dim")), - Lists.newArrayList(), - Lists.newArrayList() - ), - null, - Arrays.asList("ts", "dim", "val") + return new IndexTask.IndexIngestionSpec( + new DataSchema( + "test", + jsonMapper.convertValue( + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "ts", + "auto", + null ), - null + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val") ), - Map.class - ), - new AggregatorFactory[]{ - new LongSumAggregatorFactory("val", "val") - }, - new UniformGranularitySpec( - Granularity.HOUR, - QueryGranularities.HOUR, - Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z")) - ), - jsonMapper - ), - new IndexTask.IndexIOConfig( - new LocalFirehoseFactory( - tmpDir, - "druid*", null - ) + ), + Map.class ), - null + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + granularitySpec != null ? granularitySpec : new UniformGranularitySpec( + Granularity.DAY, + QueryGranularities.MINUTE, + Arrays.asList(new Interval("2014/2015")) + ), + jsonMapper ), - jsonMapper, - null + new IndexTask.IndexIOConfig( + new LocalFirehoseFactory( + baseDir, + "druid*", + null + ), appendToExisting, null + ), + new IndexTask.IndexTuningConfig( + targetPartitionSize, + 1, + null, + numShards, + indexSpec, + null, + true, + forceExtendableShardSpecs, + true + ) ); - - final List segments = runTask(indexTask); - - Assert.assertEquals(1, segments.size()); } - - @Test - public void testConvertProps() - { - ShardSpec spec = new NumberedShardSpec(1, 2); - IndexTask.IndexTuningConfig config = new IndexTask.IndexTuningConfig( - 100, - 1000, - null, - new IndexSpec(), - null, - false - ); - RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig( - spec, - config.getRowFlushBoundary(), - config.getIndexSpec(), - config.getBuildV9Directly() - ); - Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary()); - Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec); - Assert.assertEquals(realtimeTuningConfig.getIndexSpec(), indexSpec); - } - } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 1d4f84a314f..48d283cd98d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; - import io.druid.client.indexing.ClientAppendQuery; import io.druid.client.indexing.ClientKillQuery; import io.druid.client.indexing.ClientMergeQuery; @@ -51,10 +50,13 @@ import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; +import org.hamcrest.CoreMatchers; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; @@ -65,6 +67,9 @@ public class TaskSerdeTest private final ObjectMapper jsonMapper; private final IndexSpec indexSpec = new IndexSpec(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + public TaskSerdeTest() { TestUtils testUtils = new TestUtils(); @@ -75,6 +80,76 @@ public class TaskSerdeTest } } + @Test + public void testIndexTaskIOConfigDefaults() throws Exception + { + final IndexTask.IndexIOConfig ioConfig = jsonMapper.readValue( + "{\"type\":\"index\"}", + IndexTask.IndexIOConfig.class + ); + + Assert.assertEquals(false, ioConfig.isAppendToExisting()); + Assert.assertEquals(false, ioConfig.isSkipFirehoseCaching()); + } + + @Test + public void testIndexTaskTuningConfigDefaults() throws Exception + { + final IndexTask.IndexTuningConfig tuningConfig = jsonMapper.readValue( + "{\"type\":\"index\"}", + IndexTask.IndexTuningConfig.class + ); + + Assert.assertEquals(true, tuningConfig.isBuildV9Directly()); + Assert.assertEquals(false, tuningConfig.isForceExtendableShardSpecs()); + Assert.assertEquals(false, tuningConfig.isReportParseExceptions()); + Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec()); + Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod()); + Assert.assertEquals(0, tuningConfig.getMaxPendingPersists()); + Assert.assertEquals(75000, tuningConfig.getMaxRowsInMemory()); + Assert.assertEquals(null, tuningConfig.getNumShards()); + Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize()); + } + + @Test + public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exception + { + IndexTask.IndexTuningConfig tuningConfig = jsonMapper.readValue( + "{\"type\":\"index\", \"targetPartitionSize\":10}", + IndexTask.IndexTuningConfig.class + ); + + Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize()); + Assert.assertEquals(null, tuningConfig.getNumShards()); + + tuningConfig = jsonMapper.readValue( + "{\"type\":\"index\", \"numShards\":10}", + IndexTask.IndexTuningConfig.class + ); + + Assert.assertEquals(null, tuningConfig.getTargetPartitionSize()); + Assert.assertEquals(10, (int) tuningConfig.getNumShards()); + + tuningConfig = jsonMapper.readValue( + "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":10}", + IndexTask.IndexTuningConfig.class + ); + + Assert.assertEquals(null, tuningConfig.getTargetPartitionSize()); + Assert.assertEquals(10, (int) tuningConfig.getNumShards()); + } + + @Test + public void testIndexTaskTuningConfigTargetPartitionSizeAndNumShards() throws Exception + { + thrown.expectCause(CoreMatchers.isA(IllegalArgumentException.class)); + + jsonMapper.readValue( + "{\"type\":\"index\", \"targetPartitionSize\":10, \"numShards\":10}", + IndexTask.IndexTuningConfig.class + ); + } + @Test public void testIndexTaskSerde() throws Exception { @@ -93,11 +168,11 @@ public class TaskSerdeTest ), jsonMapper ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)), - new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false) + new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true, true), + new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true) ), - jsonMapper, - null + null, + jsonMapper ); final String json = jsonMapper.writeValueAsString(task); @@ -106,14 +181,38 @@ public class TaskSerdeTest final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); - Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval()); Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); - Assert.assertEquals(task.getInterval(), task2.getInterval()); - Assert.assertTrue(task.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory); - Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory); + + IndexTask.IndexIOConfig taskIoConfig = task.getIngestionSchema().getIOConfig(); + IndexTask.IndexIOConfig task2IoConfig = task2.getIngestionSchema().getIOConfig(); + + Assert.assertTrue(taskIoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory); + Assert.assertTrue(task2IoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory); + Assert.assertEquals(taskIoConfig.isAppendToExisting(), task2IoConfig.isAppendToExisting()); + Assert.assertEquals(taskIoConfig.isSkipFirehoseCaching(), task2IoConfig.isSkipFirehoseCaching()); + + IndexTask.IndexTuningConfig taskTuningConfig = task.getIngestionSchema().getTuningConfig(); + IndexTask.IndexTuningConfig task2TuningConfig = task2.getIngestionSchema().getTuningConfig(); + + Assert.assertEquals(taskTuningConfig.getBasePersistDirectory(), task2TuningConfig.getBasePersistDirectory()); + Assert.assertEquals(taskTuningConfig.getIndexSpec(), task2TuningConfig.getIndexSpec()); + Assert.assertEquals( + taskTuningConfig.getIntermediatePersistPeriod(), + task2TuningConfig.getIntermediatePersistPeriod() + ); + Assert.assertEquals(taskTuningConfig.getMaxPendingPersists(), task2TuningConfig.getMaxPendingPersists()); + Assert.assertEquals(taskTuningConfig.getMaxRowsInMemory(), task2TuningConfig.getMaxRowsInMemory()); + Assert.assertEquals(taskTuningConfig.getNumShards(), task2TuningConfig.getNumShards()); + Assert.assertEquals(taskTuningConfig.getTargetPartitionSize(), task2TuningConfig.getTargetPartitionSize()); + Assert.assertEquals(taskTuningConfig.isBuildV9Directly(), task2TuningConfig.isBuildV9Directly()); + Assert.assertEquals( + taskTuningConfig.isForceExtendableShardSpecs(), + task2TuningConfig.isForceExtendableShardSpecs() + ); + Assert.assertEquals(taskTuningConfig.isReportParseExceptions(), task2TuningConfig.isReportParseExceptions()); } @Test @@ -134,11 +233,11 @@ public class TaskSerdeTest ), jsonMapper ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)), - new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false) + new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true, null), + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) ), - jsonMapper, - null + null, + jsonMapper ); for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) { @@ -151,7 +250,6 @@ public class TaskSerdeTest final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); - Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval()); Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(2, task.getTaskResource().getRequiredCapacity()); @@ -160,7 +258,6 @@ public class TaskSerdeTest Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); - Assert.assertEquals(task.getInterval(), task2.getInterval()); Assert.assertTrue(task.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory); Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory); } @@ -168,11 +265,13 @@ public class TaskSerdeTest @Test public void testMergeTaskSerde() throws Exception { - final List segments = ImmutableList.of(DataSegment.builder() - .dataSource("foo") - .interval(new Interval("2010-01-01/P1D")) - .version("1234") - .build()); + final List segments = ImmutableList.of( + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2010-01-01/P1D")) + .version("1234") + .build() + ); final List aggregators = ImmutableList.of(new CountAggregatorFactory("cnt")); final MergeTask task = new MergeTask( null, @@ -202,11 +301,15 @@ public class TaskSerdeTest task2.getAggregators().get(0).getName() ); - final MergeTask task3 = (MergeTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientMergeQuery( - "foo", - segments, - aggregators - )), Task.class); + final MergeTask task3 = (MergeTask) jsonMapper.readValue( + jsonMapper.writeValueAsString( + new ClientMergeQuery( + "foo", + segments, + aggregators + ) + ), Task.class + ); Assert.assertEquals("foo", task3.getDataSource()); Assert.assertEquals(new Interval("2010-01-01/P1D"), task3.getInterval()); @@ -237,10 +340,14 @@ public class TaskSerdeTest Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getInterval(), task2.getInterval()); - final KillTask task3 = (KillTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientKillQuery( - "foo", - new Interval("2010-01-01/P1D") - )), Task.class); + final KillTask task3 = (KillTask) jsonMapper.readValue( + jsonMapper.writeValueAsString( + new ClientKillQuery( + "foo", + new Interval("2010-01-01/P1D") + ) + ), Task.class + ); Assert.assertEquals("foo", task3.getDataSource()); Assert.assertEquals(new Interval("2010-01-01/P1D"), task3.getInterval()); @@ -421,10 +528,14 @@ public class TaskSerdeTest Assert.assertEquals(task.getInterval(), task2.getInterval()); Assert.assertEquals(task.getSegments(), task2.getSegments()); - final AppendTask task3 = (AppendTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientAppendQuery( - "foo", - segments - )), Task.class); + final AppendTask task3 = (AppendTask) jsonMapper.readValue( + jsonMapper.writeValueAsString( + new ClientAppendQuery( + "foo", + segments + ) + ), Task.class + ); Assert.assertEquals("foo", task3.getDataSource()); Assert.assertEquals(new Interval("2010-01-01/P2D"), task3.getInterval()); @@ -521,10 +632,12 @@ public class TaskSerdeTest ); final ConvertSegmentTask convertSegmentTaskOriginal = ConvertSegmentTask.create( segment, - new IndexSpec(new RoaringBitmapSerdeFactory(null), - CompressedObjectStrategy.CompressionStrategy.LZF, - CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED, - CompressionFactory.LongEncodingStrategy.LONGS), + new IndexSpec( + new RoaringBitmapSerdeFactory(null), + CompressedObjectStrategy.CompressionStrategy.LZF, + CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED, + CompressionFactory.LongEncodingStrategy.LONGS + ), false, true, null diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 14f77327740..658771872d9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -651,11 +651,11 @@ public class TaskLifecycleTest ), mapper ), - new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)), - new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false) + new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false, null), + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) ), - mapper, - null + null, + MAPPER ); final Optional preRunTaskStatus = tsqa.getStatus(indexTask.getId()); @@ -709,11 +709,11 @@ public class TaskLifecycleTest ), mapper ), - new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()), - new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false) + new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false, null), + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true) ), - mapper, - null + null, + MAPPER ); final TaskStatus status = runTask(indexTask); @@ -1068,11 +1068,11 @@ public class TaskLifecycleTest ), mapper ), - new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)), - new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false) + new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false, null), + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null) ), - mapper, - null + null, + MAPPER ); final long startTime = System.currentTimeMillis(); diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index dd8a605af96..2c9fb462d1f 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -30,6 +30,7 @@ import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import java.util.Arrays; @@ -56,6 +57,7 @@ public class FirehoseModule implements DruidModule new NamedType(LocalFirehoseFactory.class, "local"), new NamedType(EventReceiverFirehoseFactory.class, "receiver"), new NamedType(CombiningFirehoseFactory.class, "combining"), + new NamedType(ReplayableFirehoseFactory.class, "replayable"), new NamedType(FixedCountFirehoseFactory.class, "fixedCount") ) ); diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index ee197368b73..e946be6d9e2 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -194,4 +194,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec result = 31 * result + (timezone != null ? timezone.hashCode() : 0); return result; } + + @Override + public GranularitySpec withIntervals(List inputIntervals) { + return new ArbitraryGranularitySpec(queryGranularity, rollup, inputIntervals, timezone); + } } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java index aa4de777f8e..a92bb049433 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java @@ -66,11 +66,13 @@ public interface GranularitySpec */ public Optional bucketInterval(DateTime dt); - public Granularity getSegmentGranularity(); + Granularity getSegmentGranularity(); - public boolean isRollup(); + boolean isRollup(); - public QueryGranularity getQueryGranularity(); + QueryGranularity getQueryGranularity(); - public String getTimezone(); + String getTimezone(); + + GranularitySpec withIntervals(List inputIntervals); } diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java index a70affec76d..bbbc6090390 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -189,4 +189,9 @@ public class UniformGranularitySpec implements GranularitySpec result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0); return result; } + + @Override + public GranularitySpec withIntervals(List inputIntervals) { + return new UniformGranularitySpec(segmentGranularity, queryGranularity, rollup, inputIntervals, timezone); + } } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java new file mode 100644 index 00000000000..53a586ce4f2 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/ReplayableFirehoseFactory.java @@ -0,0 +1,319 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.firehose; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.fasterxml.jackson.dataformat.smile.SmileGenerator; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Ordering; +import com.google.common.io.CountingOutputStream; +import com.google.common.io.Files; +import com.metamx.emitter.EmittingLogger; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.Row; +import io.druid.data.input.Rows; +import io.druid.data.input.impl.InputRowParser; +import io.druid.guice.annotations.Smile; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.utils.Runnables; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * Creates a wrapper firehose that writes from another firehose to disk and then serves nextRow() from disk. Useful for + * tasks that require multiple passes through the data to prevent multiple remote fetches. Also has support for + * retrying fetches if the underlying firehose throws an exception while the local cache is being generated. + */ +public class ReplayableFirehoseFactory implements FirehoseFactory +{ + private static final EmittingLogger log = new EmittingLogger(ReplayableFirehoseFactory.class); + private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; + private static final int DEFAULT_MAX_TEMP_FILE_SIZE = 250000000; + private static final int DEFAULT_READ_FIREHOSE_RETRIES = 3; + + private final FirehoseFactory delegateFactory; + private final boolean reportParseExceptions; + + // This is *roughly* the max size of the temp files that will be generated, but they may be slightly larger. The + // reason for the approximation is that we're not forcing flushes after writing to the generator so the number of + // bytes written to the stream won't be updated until the flush occurs. It's probably more important to optimize for + // I/O speed rather than maintaining a strict max on the size of the temp file before it's split. + private final int maxTempFileSize; + + private final int readFirehoseRetries; + private final ObjectMapper smileMapper; + + private ReplayableFirehose firehose; + + @JsonCreator + public ReplayableFirehoseFactory( + @JsonProperty("delegate") FirehoseFactory delegateFactory, + @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("maxTempFileSize") Integer maxTempFileSize, + @JsonProperty("readFirehoseRetries") Integer readFirehoseRetries, + @Smile @JacksonInject ObjectMapper smileMapper + ) + { + Preconditions.checkNotNull(delegateFactory, "delegate cannot be null"); + Preconditions.checkArgument( + !(delegateFactory instanceof ReplayableFirehoseFactory), + "Refusing to wrap another ReplayableFirehoseFactory" + ); + + this.delegateFactory = delegateFactory; + this.reportParseExceptions = reportParseExceptions == null + ? DEFAULT_REPORT_PARSE_EXCEPTIONS + : reportParseExceptions; + this.maxTempFileSize = maxTempFileSize == null ? DEFAULT_MAX_TEMP_FILE_SIZE : maxTempFileSize; + this.readFirehoseRetries = readFirehoseRetries == null ? DEFAULT_READ_FIREHOSE_RETRIES : readFirehoseRetries; + + this.smileMapper = smileMapper; + + log.info(this.toString()); + } + + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + if (firehose == null) { + firehose = new ReplayableFirehose(parser); + } else { + log.info("Rewinding and returning existing firehose"); + firehose.rewind(); + } + + return firehose; + } + + public class ReplayableFirehose implements Firehose + { + private final List files = new ArrayList<>(); + private final List dimensions; + + private int fileIndex = 0; + private JsonFactory jsonFactory; + private JsonParser jsonParser; + private Iterator it; + + public ReplayableFirehose(InputRowParser parser) throws IOException + { + jsonFactory = smileMapper.getFactory(); + + if (jsonFactory instanceof SmileFactory) { + jsonFactory = ((SmileFactory) jsonFactory).enable(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES); + } + + long counter = 0, totalBytes = 0, unparseable = 0, retryCount = 0; + Set dimensionScratch = new HashSet<>(); + + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + long startTime = System.nanoTime(); + boolean isDone = false; + do { + deleteTempFiles(); + try (Firehose delegateFirehose = delegateFactory.connect(parser)) { + while (delegateFirehose.hasMore()) { + File tmpFile = File.createTempFile("replayable-", null, tmpDir); + tmpFile.deleteOnExit(); + + files.add(tmpFile); + log.debug("Created file [%s]", tmpFile.getAbsolutePath()); + + try (CountingOutputStream cos = new CountingOutputStream(new FileOutputStream(tmpFile)); + JsonGenerator generator = jsonFactory.createGenerator(cos)) { + + while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) { + try { + InputRow row = delegateFirehose.nextRow(); + generator.writeObject(row); + dimensionScratch.addAll(row.getDimensions()); + counter++; + } + catch (ParseException e) { + if (reportParseExceptions) { + throw e; + } + unparseable++; + } + } + + totalBytes += cos.getCount(); + } + } + isDone = true; + } + catch (Exception e) { + if (++retryCount <= readFirehoseRetries && !(e instanceof ParseException)) { + log.error(e, "Delegate firehose threw an exception, retrying (%d of %d)", retryCount, readFirehoseRetries); + } else { + log.error(e, "Delegate firehose threw an exception, retries exhausted, aborting"); + Throwables.propagate(e); + } + } + } while (!isDone); + + log.info( + "Finished reading from firehose in [%,dms], [%,d] events parsed, [%,d] bytes written, [%,d] events unparseable", + (System.nanoTime() - startTime) / 1000000, + counter, + totalBytes, + unparseable + ); + + dimensions = Ordering.natural().immutableSortedCopy(dimensionScratch); + + if (counter == 0) { + log.warn("Firehose contains no events!"); + deleteTempFiles(); + it = Iterators.emptyIterator(); + } else { + jsonParser = jsonFactory.createParser(files.get(fileIndex)); + it = jsonParser.readValuesAs(Row.class); + } + } + + @Override + public boolean hasMore() + { + if (it.hasNext()) { + return true; + } + + try { + if (jsonParser != null) { + jsonParser.close(); + } + + if (++fileIndex >= files.size() || files.get(fileIndex).length() == 0) { + return false; + } + + jsonParser = jsonFactory.createParser(files.get(fileIndex)); + it = jsonParser.readValuesAs(Row.class); + return true; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public InputRow nextRow() + { + return Rows.toCaseInsensitiveInputRow(it.next(), dimensions); + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + /** + * Closes the firehose by closing the input stream and setting an empty iterator. The underlying cache files + * backing the firehose are retained for when the firehose is "replayed" by calling rewind(). The cache files are + * deleted by File.deleteOnExit() when the process exits. + */ + @Override + public void close() throws IOException + { + if (jsonParser != null) { + jsonParser.close(); + } + it = Iterators.emptyIterator(); + } + + private void rewind() throws IOException + { + close(); + + if (!files.isEmpty()) { + fileIndex = 0; + jsonParser = jsonFactory.createParser(files.get(fileIndex)); + it = jsonParser.readValuesAs(Row.class); + } + } + + private void deleteTempFiles() + { + for (File file : files) { + log.debug("Deleting temp file: %s", file.getAbsolutePath()); + file.delete(); + } + + files.clear(); + } + } + + @JsonProperty("delegate") + public FirehoseFactory getDelegateFactory() + { + return delegateFactory; + } + + @JsonProperty("reportParseExceptions") + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + + @JsonProperty("maxTempFileSize") + public int getMaxTempFileSize() + { + return maxTempFileSize; + } + + @JsonProperty("readFirehoseRetries") + public int getReadFirehoseRetries() + { + return readFirehoseRetries; + } + + @Override + public String toString() + { + return "ReplayableFirehoseFactory{" + + "delegateFactory=" + delegateFactory + + ", reportParseExceptions=" + reportParseExceptions + + ", maxTempFileSize=" + maxTempFileSize + + ", readFirehoseRetries=" + readFirehoseRetries + + '}'; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/NoopSegmentHandoffNotifierFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/NoopSegmentHandoffNotifierFactory.java new file mode 100644 index 00000000000..5144458c6b9 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/NoopSegmentHandoffNotifierFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.plumber; + +import io.druid.java.util.common.logger.Logger; +import io.druid.query.SegmentDescriptor; + +import java.util.concurrent.Executor; + +public class NoopSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory +{ + private static final Logger log = new Logger(NoopSegmentHandoffNotifierFactory.class); + + private static final SegmentHandoffNotifier NOTIFIER = new SegmentHandoffNotifier() + { + @Override + public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable) + { + log.info("Not waiting for segment to be handed off, executing handOffRunnable"); + exec.execute(handOffRunnable); + return true; + } + + @Override + public void start() {} + + @Override + public void close() {} + }; + + @Override + public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) + { + return NOTIFIER; + } +} diff --git a/server/src/test/java/io/druid/realtime/firehose/ReplayableFirehoseFactoryTest.java b/server/src/test/java/io/druid/realtime/firehose/ReplayableFirehoseFactoryTest.java new file mode 100644 index 00000000000..f1631049c4c --- /dev/null +++ b/server/src/test/java/io/druid/realtime/firehose/ReplayableFirehoseFactoryTest.java @@ -0,0 +1,447 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.realtime.firehose; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.MapInputRowParser; +import io.druid.data.input.impl.TimeAndDimsParseSpec; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.parsers.ParseException; +import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory; +import org.easymock.EasyMockSupport; +import org.easymock.IAnswer; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; + +public class ReplayableFirehoseFactoryTest extends EasyMockSupport +{ + private FirehoseFactory delegateFactory = createMock(FirehoseFactory.class); + private Firehose delegateFirehose = createMock(Firehose.class); + private InputRowParser parser = new MapInputRowParser(new TimeAndDimsParseSpec(null, null)); + private ObjectMapper mapper = new DefaultObjectMapper(); + + + private List testRows = Lists.newArrayList( + new MapBasedInputRow( + DateTime.now(), Lists.newArrayList("dim1", "dim2"), + ImmutableMap.of("dim1", "val1", "dim2", "val2", "met1", 1) + ), + new MapBasedInputRow( + DateTime.now(), Lists.newArrayList("dim1", "dim2"), + ImmutableMap.of("dim1", "val5", "dim2", "val2", "met1", 2) + ), + new MapBasedInputRow( + DateTime.now(), Lists.newArrayList("dim2", "dim3"), + ImmutableMap.of("dim2", "val1", "dim3", "val2", "met1", 3) + ) + ); + + private ReplayableFirehoseFactory replayableFirehoseFactory; + + @Before + public void setup() + { + replayableFirehoseFactory = new ReplayableFirehoseFactory( + delegateFactory, + true, + 10000, + 3, + mapper + ); + } + + @Test + public void testConstructor() throws Exception + { + Assert.assertEquals(delegateFactory, replayableFirehoseFactory.getDelegateFactory()); + Assert.assertEquals(10000, replayableFirehoseFactory.getMaxTempFileSize()); + Assert.assertEquals(3, replayableFirehoseFactory.getReadFirehoseRetries()); + Assert.assertEquals(true, replayableFirehoseFactory.isReportParseExceptions()); + } + + @Test + public void testReplayableFirehoseNoEvents() throws Exception + { + expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); + expect(delegateFirehose.hasMore()).andReturn(false); + delegateFirehose.close(); + replayAll(); + + try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { + Assert.assertFalse(firehose.hasMore()); + } + verifyAll(); + } + + @Test + public void testReplayableFirehoseWithEvents() throws Exception + { + final boolean hasMore[] = {true}; + + expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); + expect(delegateFirehose.hasMore()).andAnswer( + new IAnswer() + { + @Override + public Boolean answer() throws Throwable + { + return hasMore[0]; + } + } + ).anyTimes(); + expect(delegateFirehose.nextRow()) + .andReturn(testRows.get(0)) + .andReturn(testRows.get(1)) + .andAnswer( + new IAnswer() + { + @Override + public InputRow answer() throws Throwable + { + hasMore[0] = false; + return testRows.get(2); + } + } + ); + + delegateFirehose.close(); + replayAll(); + + List rows = Lists.newArrayList(); + try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + Assert.assertEquals(testRows, rows); + + // now replay! + rows.clear(); + try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + Assert.assertEquals(testRows, rows); + + verifyAll(); + } + + @Test + public void testReplayableFirehoseWithoutReportParseExceptions() throws Exception + { + final boolean hasMore[] = {true}; + replayableFirehoseFactory = new ReplayableFirehoseFactory( + delegateFactory, + false, + 10000, + 3, + mapper + ); + + expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); + expect(delegateFirehose.hasMore()).andAnswer( + new IAnswer() + { + @Override + public Boolean answer() throws Throwable + { + return hasMore[0]; + } + } + ).anyTimes(); + expect(delegateFirehose.nextRow()) + .andReturn(testRows.get(0)) + .andReturn(testRows.get(1)) + .andThrow(new ParseException("unparseable!")) + .andAnswer( + new IAnswer() + { + @Override + public InputRow answer() throws Throwable + { + hasMore[0] = false; + return testRows.get(2); + } + } + ); + + delegateFirehose.close(); + replayAll(); + + List rows = Lists.newArrayList(); + try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + Assert.assertEquals(testRows, rows); + + verifyAll(); + } + + @Test(expected = ParseException.class) + public void testReplayableFirehoseWithReportParseExceptions() throws Exception + { + final boolean hasMore[] = {true}; + + expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); + expect(delegateFirehose.hasMore()).andAnswer( + new IAnswer() + { + @Override + public Boolean answer() throws Throwable + { + return hasMore[0]; + } + } + ).anyTimes(); + expect(delegateFirehose.nextRow()) + .andReturn(testRows.get(0)) + .andReturn(testRows.get(1)) + .andThrow(new ParseException("unparseable!")) + .andAnswer( + new IAnswer() + { + @Override + public InputRow answer() throws Throwable + { + hasMore[0] = false; + return testRows.get(2); + } + } + ); + + delegateFirehose.close(); + replayAll(); + + replayableFirehoseFactory.connect(parser); + verifyAll(); + } + + @Test + public void testReplayableFirehoseWithConnectRetries() throws Exception + { + final boolean hasMore[] = {true}; + + expect(delegateFactory.connect(parser)).andThrow(new IOException()) + .andReturn(delegateFirehose); + expect(delegateFirehose.hasMore()).andAnswer( + new IAnswer() + { + @Override + public Boolean answer() throws Throwable + { + return hasMore[0]; + } + } + ).anyTimes(); + expect(delegateFirehose.nextRow()) + .andReturn(testRows.get(0)) + .andReturn(testRows.get(1)) + .andAnswer( + new IAnswer() + { + @Override + public InputRow answer() throws Throwable + { + hasMore[0] = false; + return testRows.get(2); + } + } + ); + delegateFirehose.close(); + replayAll(); + + List rows = Lists.newArrayList(); + try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + Assert.assertEquals(testRows, rows); + + verifyAll(); + } + + @Test + public void testReplayableFirehoseWithNextRowRetries() throws Exception + { + final boolean hasMore[] = {true}; + + expect(delegateFactory.connect(parser)).andReturn(delegateFirehose).times(2); + expect(delegateFirehose.hasMore()).andAnswer( + new IAnswer() + { + @Override + public Boolean answer() throws Throwable + { + return hasMore[0]; + } + } + ).anyTimes(); + expect(delegateFirehose.nextRow()) + .andReturn(testRows.get(0)) + .andThrow(new RuntimeException()) + .andReturn(testRows.get(0)) + .andReturn(testRows.get(1)) + .andAnswer( + new IAnswer() + { + @Override + public InputRow answer() throws Throwable + { + hasMore[0] = false; + return testRows.get(2); + } + } + ); + delegateFirehose.close(); + expectLastCall().times(2); + replayAll(); + + List rows = Lists.newArrayList(); + try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + Assert.assertEquals(testRows, rows); + + verifyAll(); + } + + @Test(expected = TestReadingException.class) + public void testReplayableFirehoseWithNoRetries() throws Exception + { + replayableFirehoseFactory = new ReplayableFirehoseFactory( + delegateFactory, + false, + 10000, + 0, + mapper + ); + + expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); + expect(delegateFirehose.hasMore()).andReturn(true).times(2); + expect(delegateFirehose.nextRow()).andThrow(new TestReadingException()); + + delegateFirehose.close(); + expectLastCall(); + replayAll(); + + replayableFirehoseFactory.connect(parser); + verifyAll(); + } + + @Test + public void testReplayableFirehoseWithMultipleFiles() throws Exception + { + replayableFirehoseFactory = new ReplayableFirehoseFactory(delegateFactory, false, 1, 3, mapper); + + final boolean hasMore[] = {true}; + final int multiplicationFactor = 500; + + final InputRow finalRow = new MapBasedInputRow( + DateTime.now(), Lists.newArrayList("dim4", "dim5"), + ImmutableMap.of("dim4", "val12", "dim5", "val20", "met1", 30) + ); + + expect(delegateFactory.connect(parser)).andReturn(delegateFirehose); + expect(delegateFirehose.hasMore()).andAnswer( + new IAnswer() + { + @Override + public Boolean answer() throws Throwable + { + return hasMore[0]; + } + } + ).anyTimes(); + + expect(delegateFirehose.nextRow()) + .andReturn(testRows.get(0)).times(multiplicationFactor) + .andReturn(testRows.get(1)).times(multiplicationFactor) + .andReturn(testRows.get(2)).times(multiplicationFactor) + .andAnswer( + new IAnswer() + { + @Override + public InputRow answer() throws Throwable + { + hasMore[0] = false; + return finalRow; + } + } + ); + + delegateFirehose.close(); + replayAll(); + + List testRowsMultiplied = Lists.newArrayList(); + for (InputRow row : testRows) { + for (int i = 0; i < multiplicationFactor; i++) { + testRowsMultiplied.add(row); + } + } + testRowsMultiplied.add(finalRow); + + List rows = Lists.newArrayList(); + try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + Assert.assertEquals(testRowsMultiplied, rows); + + // now replay! + rows.clear(); + try (Firehose firehose = replayableFirehoseFactory.connect(parser)) { + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + } + Assert.assertEquals(testRowsMultiplied, rows); + + verifyAll(); + } + + private class TestReadingException extends RuntimeException + { + } +} + + +