IndexTask improvements (#3611)

* index task improvements

* code review changes

* add null check
This commit is contained in:
David Lim 2017-01-18 15:24:37 -07:00 committed by Fangjin Yang
parent 7004f5d499
commit ff52581bd3
15 changed files with 1788 additions and 629 deletions

View File

@ -53,7 +53,7 @@ public interface Firehose extends Closeable
*
* @return The next row
*/
public InputRow nextRow() ;
public InputRow nextRow();
/**
* Returns a runnable that will "commit" everything read up to the point at which commit() is called. This is

View File

@ -187,7 +187,7 @@ This spec is used to generated segments with uniform intervals.
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for Hadoop ingestion, no otherwise |
| timezone | string | The timezone to represent the interval offsets in. Only valid if intervals are explicitly specified for batch ingestion. Will not be valid for kafka based ingestion. | no (default == 'UTC')
### Arbitrary Granularity Spec

View File

@ -76,9 +76,8 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
},
"tuningConfig" : {
"type" : "index",
"targetPartitionSize" : -1,
"rowFlushBoundary" : 0,
"numShards": 1
"targetPartitionSize" : 5000000,
"maxRowsInMemory" : 75000
}
}
}
@ -100,7 +99,12 @@ See [Ingestion](../ingestion/index.html)
#### IOConfig
This field is required. You can specify a type of [Firehose](../ingestion/firehose.html) here.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|The task type, this should always be "index".|none|yes|
|firehose|Specify a [Firehose](../ingestion/firehose.html) here.|none|yes|
|appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This will only work if the existing segment set has extendable-type shardSpecs (which can be forced by setting 'forceExtendableShardSpecs' in the tuning config).|false|no|
|skipFirehoseCaching|By default the IndexTask will fully read the supplied firehose to disk before processing the data. This prevents the task from doing multiple remote fetches and enforces determinism if more than one pass through the data is required. It also allows the task to retry fetching the data if the firehose throws an exception during reading. This requires sufficient disk space for the temporary cache.|false|no|
#### TuningConfig
@ -108,13 +112,15 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|The task type, this should always be "index".|None.|yes|
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment. Set this to -1 to use numShards instead for sharding.|5000000|no|
|rowFlushBoundary|Used in determining when intermediate persist should occur to disk.|75000|no|
|numShards|Directly specify the number of shards to create. You can skip the intermediate persist step if you specify the number of shards you want and set targetPartitionSize=-1.|null|no|
|type|The task type, this should always be "index".|none|yes|
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no|
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|buildV9Directly|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|true|no|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
#### IndexSpec

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.task.Task;
import io.druid.timeline.DataSegment;
@ -80,6 +81,8 @@ public class TaskStorageQueryAdapter
for (final TaskAction action : storage.getAuditLogs(taskid)) {
if (action instanceof SegmentInsertAction) {
segments.addAll(((SegmentInsertAction) action).getSegments());
} else if (action instanceof SegmentTransactionalInsertAction) {
segments.addAll(((SegmentTransactionalInsertAction) action).getSegments());
}
}
return segments;

View File

@ -21,7 +21,6 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.SpatialDimensionSchema;
@ -31,9 +30,13 @@ import io.druid.granularity.QueryGranularities;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.SegmentAllocateAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.Granularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -42,13 +45,15 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
@ -75,6 +80,7 @@ public class IndexTaskTest
private IndexMerger indexMerger;
private IndexMergerV9 indexMergerV9;
private IndexIO indexIO;
private volatile int segmentAllocatePartitionCounter;
public IndexTaskTest()
{
@ -102,57 +108,9 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
),
null
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularities.MINUTE,
Arrays.asList(new Interval("2014/2015"))
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
new IndexTask.IndexTuningConfig(
2,
0,
null,
indexSpec,
null,
false
)
),
jsonMapper,
null
createIngestionSpec(tmpDir, null, 2, null, false, false),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
@ -188,57 +146,9 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
),
null
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularities.MINUTE,
Arrays.asList(new Interval("2014/2015"))
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
new IndexTask.IndexTuningConfig(
2,
0,
null,
indexSpec,
null,
true
)
),
jsonMapper,
null
createIngestionSpec(tmpDir, null, 2, null, true, false),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
@ -274,49 +184,19 @@ public class IndexTaskTest
IndexTask indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
),
null
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new ArbitraryGranularitySpec(
QueryGranularities.MINUTE,
Arrays.asList(new Interval("2014/2015"))
),
jsonMapper
createIngestionSpec(
tmpDir,
new ArbitraryGranularitySpec(
QueryGranularities.MINUTE,
Arrays.asList(new Interval("2014/2015"))
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
null
10,
null,
false,
false
),
jsonMapper,
null
null,
jsonMapper
);
List<DataSegment> segments = runTask(indexTask);
@ -324,6 +204,160 @@ public class IndexTaskTest
Assert.assertEquals(1, segments.size());
}
@Test
public void testIntervalBucketing() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2015-03-01T07:59:59.977Z,a,1");
writer.println("2015-03-01T08:00:00.000Z,b,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
new UniformGranularitySpec(
Granularity.HOUR,
QueryGranularities.HOUR,
Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
),
50,
null,
false,
false
),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
}
@Test
public void testNumShardsProvided() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2014-01-01T00:00:10Z,a,1");
writer.println("2014-01-01T01:00:20Z,b,1");
writer.println("2014-01-01T02:00:30Z,c,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, null, 1, false, false),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
Assert.assertEquals("test", segments.get(0).getDataSource());
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class));
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
}
@Test
public void testAppendToExisting() throws Exception
{
segmentAllocatePartitionCounter = 0;
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2014-01-01T00:00:10Z,a,1");
writer.println("2014-01-01T01:00:20Z,b,1");
writer.println("2014-01-01T02:00:30Z,c,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(tmpDir, null, 2, null, false, true),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(2, segmentAllocatePartitionCounter);
Assert.assertEquals(2, segments.size());
Assert.assertEquals("test", segments.get(0).getDataSource());
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class));
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
Assert.assertEquals("test", segments.get(1).getDataSource());
Assert.assertEquals(new Interval("2014/P1D"), segments.get(1).getInterval());
Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class));
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
}
@Test
public void testIntervalNotSpecified() throws Exception
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2014-01-01T00:00:10Z,a,1");
writer.println("2014-01-01T01:00:20Z,b,1");
writer.println("2014-01-01T02:00:30Z,c,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
createIngestionSpec(
tmpDir,
new UniformGranularitySpec(
Granularity.HOUR,
QueryGranularities.MINUTE,
null
),
2,
null,
false,
false
),
null,
jsonMapper
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(3, segments.size());
Assert.assertEquals("test", segments.get(0).getDataSource());
Assert.assertEquals(new Interval("2014-01-01T00/PT1H"), segments.get(0).getInterval());
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NoneShardSpec.class));
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
Assert.assertEquals("test", segments.get(1).getDataSource());
Assert.assertEquals(new Interval("2014-01-01T01/PT1H"), segments.get(1).getInterval());
Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NoneShardSpec.class));
Assert.assertEquals(0, segments.get(1).getShardSpec().getPartitionNum());
Assert.assertEquals("test", segments.get(2).getDataSource());
Assert.assertEquals(new Interval("2014-01-01T02/PT1H"), segments.get(2).getInterval());
Assert.assertTrue(segments.get(2).getShardSpec().getClass().equals(NoneShardSpec.class));
Assert.assertEquals(0, segments.get(2).getShardSpec().getPartitionNum());
}
private final List<DataSegment> runTask(final IndexTask indexTask) throws Exception
{
final List<DataSegment> segments = Lists.newArrayList();
@ -342,6 +376,30 @@ public class IndexTaskTest
)
);
}
if (taskAction instanceof LockAcquireAction) {
return (RetType) new TaskLock(
"groupId",
"test",
((LockAcquireAction) taskAction).getInterval(),
new DateTime().toString()
);
}
if (taskAction instanceof SegmentTransactionalInsertAction) {
return (RetType) new SegmentPublishResult(
((SegmentTransactionalInsertAction) taskAction).getSegments(),
true
);
}
if (taskAction instanceof SegmentAllocateAction) {
SegmentAllocateAction action = (SegmentAllocateAction) taskAction;
Interval interval = action.getPreferredSegmentGranularity().bucket(action.getTimestamp());
ShardSpec shardSpec = new NumberedShardSpec(segmentAllocatePartitionCounter++, 0);
return (RetType) new SegmentIdentifier(action.getDataSource(), interval, "latestVersion", shardSpec);
}
return null;
}
}, null, new DataSegmentPusher()
@ -365,7 +423,7 @@ public class IndexTaskTest
segments.add(segment);
return segment;
}
}, null, null, null, null, null, null, null, null, null, null, temporaryFolder.newFolder(),
}, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
indexMerger, indexIO, null, null, indexMergerV9
)
);
@ -375,93 +433,66 @@ public class IndexTaskTest
return segments;
}
@Test
public void testIntervalBucketing() throws Exception
private IndexTask.IndexIngestionSpec createIngestionSpec(
File baseDir,
GranularitySpec granularitySpec,
Integer targetPartitionSize,
Integer numShards,
boolean forceExtendableShardSpecs,
boolean appendToExisting
)
{
File tmpDir = temporaryFolder.newFolder();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2015-03-01T07:59:59.977Z,a,1");
writer.println("2015-03-01T08:00:00.000Z,b,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("dim")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
return new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
jsonMapper.convertValue(
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
null
new DimensionsSpec(
DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim")),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
),
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularity.HOUR,
QueryGranularities.HOUR,
Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
),
jsonMapper
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
Map.class
),
null
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
granularitySpec != null ? granularitySpec : new UniformGranularitySpec(
Granularity.DAY,
QueryGranularities.MINUTE,
Arrays.asList(new Interval("2014/2015"))
),
jsonMapper
),
jsonMapper,
null
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
baseDir,
"druid*",
null
), appendToExisting, null
),
new IndexTask.IndexTuningConfig(
targetPartitionSize,
1,
null,
numShards,
indexSpec,
null,
true,
forceExtendableShardSpecs,
true
)
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
}
@Test
public void testConvertProps()
{
ShardSpec spec = new NumberedShardSpec(1, 2);
IndexTask.IndexTuningConfig config = new IndexTask.IndexTuningConfig(
100,
1000,
null,
new IndexSpec(),
null,
false
);
RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(
spec,
config.getRowFlushBoundary(),
config.getIndexSpec(),
config.getBuildV9Directly()
);
Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary());
Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec);
Assert.assertEquals(realtimeTuningConfig.getIndexSpec(), indexSpec);
}
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.client.indexing.ClientAppendQuery;
import io.druid.client.indexing.ClientKillQuery;
import io.druid.client.indexing.ClientMergeQuery;
@ -51,10 +50,13 @@ import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
@ -65,6 +67,9 @@ public class TaskSerdeTest
private final ObjectMapper jsonMapper;
private final IndexSpec indexSpec = new IndexSpec();
@Rule
public ExpectedException thrown = ExpectedException.none();
public TaskSerdeTest()
{
TestUtils testUtils = new TestUtils();
@ -75,6 +80,76 @@ public class TaskSerdeTest
}
}
@Test
public void testIndexTaskIOConfigDefaults() throws Exception
{
final IndexTask.IndexIOConfig ioConfig = jsonMapper.readValue(
"{\"type\":\"index\"}",
IndexTask.IndexIOConfig.class
);
Assert.assertEquals(false, ioConfig.isAppendToExisting());
Assert.assertEquals(false, ioConfig.isSkipFirehoseCaching());
}
@Test
public void testIndexTaskTuningConfigDefaults() throws Exception
{
final IndexTask.IndexTuningConfig tuningConfig = jsonMapper.readValue(
"{\"type\":\"index\"}",
IndexTask.IndexTuningConfig.class
);
Assert.assertEquals(true, tuningConfig.isBuildV9Directly());
Assert.assertEquals(false, tuningConfig.isForceExtendableShardSpecs());
Assert.assertEquals(false, tuningConfig.isReportParseExceptions());
Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec());
Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod());
Assert.assertEquals(0, tuningConfig.getMaxPendingPersists());
Assert.assertEquals(75000, tuningConfig.getMaxRowsInMemory());
Assert.assertEquals(null, tuningConfig.getNumShards());
Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize());
}
@Test
public void testIndexTaskTuningConfigTargetPartitionSizeOrNumShards() throws Exception
{
IndexTask.IndexTuningConfig tuningConfig = jsonMapper.readValue(
"{\"type\":\"index\", \"targetPartitionSize\":10}",
IndexTask.IndexTuningConfig.class
);
Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
Assert.assertEquals(null, tuningConfig.getNumShards());
tuningConfig = jsonMapper.readValue(
"{\"type\":\"index\", \"numShards\":10}",
IndexTask.IndexTuningConfig.class
);
Assert.assertEquals(null, tuningConfig.getTargetPartitionSize());
Assert.assertEquals(10, (int) tuningConfig.getNumShards());
tuningConfig = jsonMapper.readValue(
"{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":10}",
IndexTask.IndexTuningConfig.class
);
Assert.assertEquals(null, tuningConfig.getTargetPartitionSize());
Assert.assertEquals(10, (int) tuningConfig.getNumShards());
}
@Test
public void testIndexTaskTuningConfigTargetPartitionSizeAndNumShards() throws Exception
{
thrown.expectCause(CoreMatchers.isA(IllegalArgumentException.class));
jsonMapper.readValue(
"{\"type\":\"index\", \"targetPartitionSize\":10, \"numShards\":10}",
IndexTask.IndexTuningConfig.class
);
}
@Test
public void testIndexTaskSerde() throws Exception
{
@ -93,11 +168,11 @@ public class TaskSerdeTest
),
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true, true),
new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true)
),
jsonMapper,
null
null,
jsonMapper
);
final String json = jsonMapper.writeValueAsString(task);
@ -106,14 +181,38 @@ public class TaskSerdeTest
final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertTrue(task.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
IndexTask.IndexIOConfig taskIoConfig = task.getIngestionSchema().getIOConfig();
IndexTask.IndexIOConfig task2IoConfig = task2.getIngestionSchema().getIOConfig();
Assert.assertTrue(taskIoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task2IoConfig.getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertEquals(taskIoConfig.isAppendToExisting(), task2IoConfig.isAppendToExisting());
Assert.assertEquals(taskIoConfig.isSkipFirehoseCaching(), task2IoConfig.isSkipFirehoseCaching());
IndexTask.IndexTuningConfig taskTuningConfig = task.getIngestionSchema().getTuningConfig();
IndexTask.IndexTuningConfig task2TuningConfig = task2.getIngestionSchema().getTuningConfig();
Assert.assertEquals(taskTuningConfig.getBasePersistDirectory(), task2TuningConfig.getBasePersistDirectory());
Assert.assertEquals(taskTuningConfig.getIndexSpec(), task2TuningConfig.getIndexSpec());
Assert.assertEquals(
taskTuningConfig.getIntermediatePersistPeriod(),
task2TuningConfig.getIntermediatePersistPeriod()
);
Assert.assertEquals(taskTuningConfig.getMaxPendingPersists(), task2TuningConfig.getMaxPendingPersists());
Assert.assertEquals(taskTuningConfig.getMaxRowsInMemory(), task2TuningConfig.getMaxRowsInMemory());
Assert.assertEquals(taskTuningConfig.getNumShards(), task2TuningConfig.getNumShards());
Assert.assertEquals(taskTuningConfig.getTargetPartitionSize(), task2TuningConfig.getTargetPartitionSize());
Assert.assertEquals(taskTuningConfig.isBuildV9Directly(), task2TuningConfig.isBuildV9Directly());
Assert.assertEquals(
taskTuningConfig.isForceExtendableShardSpecs(),
task2TuningConfig.isForceExtendableShardSpecs()
);
Assert.assertEquals(taskTuningConfig.isReportParseExceptions(), task2TuningConfig.isReportParseExceptions());
}
@Test
@ -134,11 +233,11 @@ public class TaskSerdeTest
),
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true, null),
new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true)
),
jsonMapper,
null
null,
jsonMapper
);
for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
@ -151,7 +250,6 @@ public class TaskSerdeTest
final IndexTask task2 = (IndexTask) jsonMapper.readValue(json, Task.class);
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P2D"), task.getInterval());
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(2, task.getTaskResource().getRequiredCapacity());
@ -160,7 +258,6 @@ public class TaskSerdeTest
Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertTrue(task.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
}
@ -168,11 +265,13 @@ public class TaskSerdeTest
@Test
public void testMergeTaskSerde() throws Exception
{
final List<DataSegment> segments = ImmutableList.<DataSegment>of(DataSegment.builder()
.dataSource("foo")
.interval(new Interval("2010-01-01/P1D"))
.version("1234")
.build());
final List<DataSegment> segments = ImmutableList.<DataSegment>of(
DataSegment.builder()
.dataSource("foo")
.interval(new Interval("2010-01-01/P1D"))
.version("1234")
.build()
);
final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("cnt"));
final MergeTask task = new MergeTask(
null,
@ -202,11 +301,15 @@ public class TaskSerdeTest
task2.getAggregators().get(0).getName()
);
final MergeTask task3 = (MergeTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientMergeQuery(
"foo",
segments,
aggregators
)), Task.class);
final MergeTask task3 = (MergeTask) jsonMapper.readValue(
jsonMapper.writeValueAsString(
new ClientMergeQuery(
"foo",
segments,
aggregators
)
), Task.class
);
Assert.assertEquals("foo", task3.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task3.getInterval());
@ -237,10 +340,14 @@ public class TaskSerdeTest
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
final KillTask task3 = (KillTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientKillQuery(
"foo",
new Interval("2010-01-01/P1D")
)), Task.class);
final KillTask task3 = (KillTask) jsonMapper.readValue(
jsonMapper.writeValueAsString(
new ClientKillQuery(
"foo",
new Interval("2010-01-01/P1D")
)
), Task.class
);
Assert.assertEquals("foo", task3.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P1D"), task3.getInterval());
@ -421,10 +528,14 @@ public class TaskSerdeTest
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertEquals(task.getSegments(), task2.getSegments());
final AppendTask task3 = (AppendTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientAppendQuery(
"foo",
segments
)), Task.class);
final AppendTask task3 = (AppendTask) jsonMapper.readValue(
jsonMapper.writeValueAsString(
new ClientAppendQuery(
"foo",
segments
)
), Task.class
);
Assert.assertEquals("foo", task3.getDataSource());
Assert.assertEquals(new Interval("2010-01-01/P2D"), task3.getInterval());
@ -521,10 +632,12 @@ public class TaskSerdeTest
);
final ConvertSegmentTask convertSegmentTaskOriginal = ConvertSegmentTask.create(
segment,
new IndexSpec(new RoaringBitmapSerdeFactory(null),
CompressedObjectStrategy.CompressionStrategy.LZF,
CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED,
CompressionFactory.LongEncodingStrategy.LONGS),
new IndexSpec(
new RoaringBitmapSerdeFactory(null),
CompressedObjectStrategy.CompressionStrategy.LZF,
CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED,
CompressionFactory.LongEncodingStrategy.LONGS
),
false,
true,
null

View File

@ -651,11 +651,11 @@ public class TaskLifecycleTest
),
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false, null),
new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true)
),
mapper,
null
null,
MAPPER
);
final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId());
@ -709,11 +709,11 @@ public class TaskLifecycleTest
),
mapper
),
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory()),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false, null),
new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true)
),
mapper,
null
null,
MAPPER
);
final TaskStatus status = runTask(indexTask);
@ -1068,11 +1068,11 @@ public class TaskLifecycleTest
),
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false)),
new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, null, false)
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false, null),
new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null)
),
mapper,
null
null,
MAPPER
);
final long startTime = System.currentTimeMillis();

View File

@ -30,6 +30,7 @@ import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.FixedCountFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import java.util.Arrays;
@ -56,6 +57,7 @@ public class FirehoseModule implements DruidModule
new NamedType(LocalFirehoseFactory.class, "local"),
new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
new NamedType(CombiningFirehoseFactory.class, "combining"),
new NamedType(ReplayableFirehoseFactory.class, "replayable"),
new NamedType(FixedCountFirehoseFactory.class, "fixedCount")
)
);

View File

@ -194,4 +194,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec
result = 31 * result + (timezone != null ? timezone.hashCode() : 0);
return result;
}
@Override
public GranularitySpec withIntervals(List<Interval> inputIntervals) {
return new ArbitraryGranularitySpec(queryGranularity, rollup, inputIntervals, timezone);
}
}

View File

@ -66,11 +66,13 @@ public interface GranularitySpec
*/
public Optional<Interval> bucketInterval(DateTime dt);
public Granularity getSegmentGranularity();
Granularity getSegmentGranularity();
public boolean isRollup();
boolean isRollup();
public QueryGranularity getQueryGranularity();
QueryGranularity getQueryGranularity();
public String getTimezone();
String getTimezone();
GranularitySpec withIntervals(List<Interval> inputIntervals);
}

View File

@ -189,4 +189,9 @@ public class UniformGranularitySpec implements GranularitySpec
result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0);
return result;
}
@Override
public GranularitySpec withIntervals(List<Interval> inputIntervals) {
return new UniformGranularitySpec(segmentGranularity, queryGranularity, rollup, inputIntervals, timezone);
}
}

View File

@ -0,0 +1,319 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.Files;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.data.input.impl.InputRowParser;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.utils.Runnables;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Creates a wrapper firehose that writes from another firehose to disk and then serves nextRow() from disk. Useful for
* tasks that require multiple passes through the data to prevent multiple remote fetches. Also has support for
* retrying fetches if the underlying firehose throws an exception while the local cache is being generated.
*/
public class ReplayableFirehoseFactory implements FirehoseFactory<InputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(ReplayableFirehoseFactory.class);
private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
private static final int DEFAULT_MAX_TEMP_FILE_SIZE = 250000000;
private static final int DEFAULT_READ_FIREHOSE_RETRIES = 3;
private final FirehoseFactory delegateFactory;
private final boolean reportParseExceptions;
// This is *roughly* the max size of the temp files that will be generated, but they may be slightly larger. The
// reason for the approximation is that we're not forcing flushes after writing to the generator so the number of
// bytes written to the stream won't be updated until the flush occurs. It's probably more important to optimize for
// I/O speed rather than maintaining a strict max on the size of the temp file before it's split.
private final int maxTempFileSize;
private final int readFirehoseRetries;
private final ObjectMapper smileMapper;
private ReplayableFirehose firehose;
@JsonCreator
public ReplayableFirehoseFactory(
@JsonProperty("delegate") FirehoseFactory delegateFactory,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("maxTempFileSize") Integer maxTempFileSize,
@JsonProperty("readFirehoseRetries") Integer readFirehoseRetries,
@Smile @JacksonInject ObjectMapper smileMapper
)
{
Preconditions.checkNotNull(delegateFactory, "delegate cannot be null");
Preconditions.checkArgument(
!(delegateFactory instanceof ReplayableFirehoseFactory),
"Refusing to wrap another ReplayableFirehoseFactory"
);
this.delegateFactory = delegateFactory;
this.reportParseExceptions = reportParseExceptions == null
? DEFAULT_REPORT_PARSE_EXCEPTIONS
: reportParseExceptions;
this.maxTempFileSize = maxTempFileSize == null ? DEFAULT_MAX_TEMP_FILE_SIZE : maxTempFileSize;
this.readFirehoseRetries = readFirehoseRetries == null ? DEFAULT_READ_FIREHOSE_RETRIES : readFirehoseRetries;
this.smileMapper = smileMapper;
log.info(this.toString());
}
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
if (firehose == null) {
firehose = new ReplayableFirehose(parser);
} else {
log.info("Rewinding and returning existing firehose");
firehose.rewind();
}
return firehose;
}
public class ReplayableFirehose implements Firehose
{
private final List<File> files = new ArrayList<>();
private final List<String> dimensions;
private int fileIndex = 0;
private JsonFactory jsonFactory;
private JsonParser jsonParser;
private Iterator<Row> it;
public ReplayableFirehose(InputRowParser parser) throws IOException
{
jsonFactory = smileMapper.getFactory();
if (jsonFactory instanceof SmileFactory) {
jsonFactory = ((SmileFactory) jsonFactory).enable(SmileGenerator.Feature.CHECK_SHARED_STRING_VALUES);
}
long counter = 0, totalBytes = 0, unparseable = 0, retryCount = 0;
Set<String> dimensionScratch = new HashSet<>();
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
long startTime = System.nanoTime();
boolean isDone = false;
do {
deleteTempFiles();
try (Firehose delegateFirehose = delegateFactory.connect(parser)) {
while (delegateFirehose.hasMore()) {
File tmpFile = File.createTempFile("replayable-", null, tmpDir);
tmpFile.deleteOnExit();
files.add(tmpFile);
log.debug("Created file [%s]", tmpFile.getAbsolutePath());
try (CountingOutputStream cos = new CountingOutputStream(new FileOutputStream(tmpFile));
JsonGenerator generator = jsonFactory.createGenerator(cos)) {
while (delegateFirehose.hasMore() && cos.getCount() < getMaxTempFileSize()) {
try {
InputRow row = delegateFirehose.nextRow();
generator.writeObject(row);
dimensionScratch.addAll(row.getDimensions());
counter++;
}
catch (ParseException e) {
if (reportParseExceptions) {
throw e;
}
unparseable++;
}
}
totalBytes += cos.getCount();
}
}
isDone = true;
}
catch (Exception e) {
if (++retryCount <= readFirehoseRetries && !(e instanceof ParseException)) {
log.error(e, "Delegate firehose threw an exception, retrying (%d of %d)", retryCount, readFirehoseRetries);
} else {
log.error(e, "Delegate firehose threw an exception, retries exhausted, aborting");
Throwables.propagate(e);
}
}
} while (!isDone);
log.info(
"Finished reading from firehose in [%,dms], [%,d] events parsed, [%,d] bytes written, [%,d] events unparseable",
(System.nanoTime() - startTime) / 1000000,
counter,
totalBytes,
unparseable
);
dimensions = Ordering.natural().immutableSortedCopy(dimensionScratch);
if (counter == 0) {
log.warn("Firehose contains no events!");
deleteTempFiles();
it = Iterators.emptyIterator();
} else {
jsonParser = jsonFactory.createParser(files.get(fileIndex));
it = jsonParser.readValuesAs(Row.class);
}
}
@Override
public boolean hasMore()
{
if (it.hasNext()) {
return true;
}
try {
if (jsonParser != null) {
jsonParser.close();
}
if (++fileIndex >= files.size() || files.get(fileIndex).length() == 0) {
return false;
}
jsonParser = jsonFactory.createParser(files.get(fileIndex));
it = jsonParser.readValuesAs(Row.class);
return true;
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public InputRow nextRow()
{
return Rows.toCaseInsensitiveInputRow(it.next(), dimensions);
}
@Override
public Runnable commit()
{
return Runnables.getNoopRunnable();
}
/**
* Closes the firehose by closing the input stream and setting an empty iterator. The underlying cache files
* backing the firehose are retained for when the firehose is "replayed" by calling rewind(). The cache files are
* deleted by File.deleteOnExit() when the process exits.
*/
@Override
public void close() throws IOException
{
if (jsonParser != null) {
jsonParser.close();
}
it = Iterators.emptyIterator();
}
private void rewind() throws IOException
{
close();
if (!files.isEmpty()) {
fileIndex = 0;
jsonParser = jsonFactory.createParser(files.get(fileIndex));
it = jsonParser.readValuesAs(Row.class);
}
}
private void deleteTempFiles()
{
for (File file : files) {
log.debug("Deleting temp file: %s", file.getAbsolutePath());
file.delete();
}
files.clear();
}
}
@JsonProperty("delegate")
public FirehoseFactory getDelegateFactory()
{
return delegateFactory;
}
@JsonProperty("reportParseExceptions")
public boolean isReportParseExceptions()
{
return reportParseExceptions;
}
@JsonProperty("maxTempFileSize")
public int getMaxTempFileSize()
{
return maxTempFileSize;
}
@JsonProperty("readFirehoseRetries")
public int getReadFirehoseRetries()
{
return readFirehoseRetries;
}
@Override
public String toString()
{
return "ReplayableFirehoseFactory{" +
"delegateFactory=" + delegateFactory +
", reportParseExceptions=" + reportParseExceptions +
", maxTempFileSize=" + maxTempFileSize +
", readFirehoseRetries=" + readFirehoseRetries +
'}';
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import java.util.concurrent.Executor;
public class NoopSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory
{
private static final Logger log = new Logger(NoopSegmentHandoffNotifierFactory.class);
private static final SegmentHandoffNotifier NOTIFIER = new SegmentHandoffNotifier()
{
@Override
public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable)
{
log.info("Not waiting for segment to be handed off, executing handOffRunnable");
exec.execute(handOffRunnable);
return true;
}
@Override
public void start() {}
@Override
public void close() {}
};
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
{
return NOTIFIER;
}
}

View File

@ -0,0 +1,447 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.realtime.firehose;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.segment.realtime.firehose.ReplayableFirehoseFactory;
import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
public class ReplayableFirehoseFactoryTest extends EasyMockSupport
{
private FirehoseFactory delegateFactory = createMock(FirehoseFactory.class);
private Firehose delegateFirehose = createMock(Firehose.class);
private InputRowParser parser = new MapInputRowParser(new TimeAndDimsParseSpec(null, null));
private ObjectMapper mapper = new DefaultObjectMapper();
private List<InputRow> testRows = Lists.<InputRow>newArrayList(
new MapBasedInputRow(
DateTime.now(), Lists.newArrayList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "val1", "dim2", "val2", "met1", 1)
),
new MapBasedInputRow(
DateTime.now(), Lists.newArrayList("dim1", "dim2"),
ImmutableMap.<String, Object>of("dim1", "val5", "dim2", "val2", "met1", 2)
),
new MapBasedInputRow(
DateTime.now(), Lists.newArrayList("dim2", "dim3"),
ImmutableMap.<String, Object>of("dim2", "val1", "dim3", "val2", "met1", 3)
)
);
private ReplayableFirehoseFactory replayableFirehoseFactory;
@Before
public void setup()
{
replayableFirehoseFactory = new ReplayableFirehoseFactory(
delegateFactory,
true,
10000,
3,
mapper
);
}
@Test
public void testConstructor() throws Exception
{
Assert.assertEquals(delegateFactory, replayableFirehoseFactory.getDelegateFactory());
Assert.assertEquals(10000, replayableFirehoseFactory.getMaxTempFileSize());
Assert.assertEquals(3, replayableFirehoseFactory.getReadFirehoseRetries());
Assert.assertEquals(true, replayableFirehoseFactory.isReportParseExceptions());
}
@Test
public void testReplayableFirehoseNoEvents() throws Exception
{
expect(delegateFactory.connect(parser)).andReturn(delegateFirehose);
expect(delegateFirehose.hasMore()).andReturn(false);
delegateFirehose.close();
replayAll();
try (Firehose firehose = replayableFirehoseFactory.connect(parser)) {
Assert.assertFalse(firehose.hasMore());
}
verifyAll();
}
@Test
public void testReplayableFirehoseWithEvents() throws Exception
{
final boolean hasMore[] = {true};
expect(delegateFactory.connect(parser)).andReturn(delegateFirehose);
expect(delegateFirehose.hasMore()).andAnswer(
new IAnswer<Boolean>()
{
@Override
public Boolean answer() throws Throwable
{
return hasMore[0];
}
}
).anyTimes();
expect(delegateFirehose.nextRow())
.andReturn(testRows.get(0))
.andReturn(testRows.get(1))
.andAnswer(
new IAnswer<InputRow>()
{
@Override
public InputRow answer() throws Throwable
{
hasMore[0] = false;
return testRows.get(2);
}
}
);
delegateFirehose.close();
replayAll();
List<InputRow> rows = Lists.newArrayList();
try (Firehose firehose = replayableFirehoseFactory.connect(parser)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
Assert.assertEquals(testRows, rows);
// now replay!
rows.clear();
try (Firehose firehose = replayableFirehoseFactory.connect(parser)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
Assert.assertEquals(testRows, rows);
verifyAll();
}
@Test
public void testReplayableFirehoseWithoutReportParseExceptions() throws Exception
{
final boolean hasMore[] = {true};
replayableFirehoseFactory = new ReplayableFirehoseFactory(
delegateFactory,
false,
10000,
3,
mapper
);
expect(delegateFactory.connect(parser)).andReturn(delegateFirehose);
expect(delegateFirehose.hasMore()).andAnswer(
new IAnswer<Boolean>()
{
@Override
public Boolean answer() throws Throwable
{
return hasMore[0];
}
}
).anyTimes();
expect(delegateFirehose.nextRow())
.andReturn(testRows.get(0))
.andReturn(testRows.get(1))
.andThrow(new ParseException("unparseable!"))
.andAnswer(
new IAnswer<InputRow>()
{
@Override
public InputRow answer() throws Throwable
{
hasMore[0] = false;
return testRows.get(2);
}
}
);
delegateFirehose.close();
replayAll();
List<InputRow> rows = Lists.newArrayList();
try (Firehose firehose = replayableFirehoseFactory.connect(parser)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
Assert.assertEquals(testRows, rows);
verifyAll();
}
@Test(expected = ParseException.class)
public void testReplayableFirehoseWithReportParseExceptions() throws Exception
{
final boolean hasMore[] = {true};
expect(delegateFactory.connect(parser)).andReturn(delegateFirehose);
expect(delegateFirehose.hasMore()).andAnswer(
new IAnswer<Boolean>()
{
@Override
public Boolean answer() throws Throwable
{
return hasMore[0];
}
}
).anyTimes();
expect(delegateFirehose.nextRow())
.andReturn(testRows.get(0))
.andReturn(testRows.get(1))
.andThrow(new ParseException("unparseable!"))
.andAnswer(
new IAnswer<InputRow>()
{
@Override
public InputRow answer() throws Throwable
{
hasMore[0] = false;
return testRows.get(2);
}
}
);
delegateFirehose.close();
replayAll();
replayableFirehoseFactory.connect(parser);
verifyAll();
}
@Test
public void testReplayableFirehoseWithConnectRetries() throws Exception
{
final boolean hasMore[] = {true};
expect(delegateFactory.connect(parser)).andThrow(new IOException())
.andReturn(delegateFirehose);
expect(delegateFirehose.hasMore()).andAnswer(
new IAnswer<Boolean>()
{
@Override
public Boolean answer() throws Throwable
{
return hasMore[0];
}
}
).anyTimes();
expect(delegateFirehose.nextRow())
.andReturn(testRows.get(0))
.andReturn(testRows.get(1))
.andAnswer(
new IAnswer<InputRow>()
{
@Override
public InputRow answer() throws Throwable
{
hasMore[0] = false;
return testRows.get(2);
}
}
);
delegateFirehose.close();
replayAll();
List<InputRow> rows = Lists.newArrayList();
try (Firehose firehose = replayableFirehoseFactory.connect(parser)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
Assert.assertEquals(testRows, rows);
verifyAll();
}
@Test
public void testReplayableFirehoseWithNextRowRetries() throws Exception
{
final boolean hasMore[] = {true};
expect(delegateFactory.connect(parser)).andReturn(delegateFirehose).times(2);
expect(delegateFirehose.hasMore()).andAnswer(
new IAnswer<Boolean>()
{
@Override
public Boolean answer() throws Throwable
{
return hasMore[0];
}
}
).anyTimes();
expect(delegateFirehose.nextRow())
.andReturn(testRows.get(0))
.andThrow(new RuntimeException())
.andReturn(testRows.get(0))
.andReturn(testRows.get(1))
.andAnswer(
new IAnswer<InputRow>()
{
@Override
public InputRow answer() throws Throwable
{
hasMore[0] = false;
return testRows.get(2);
}
}
);
delegateFirehose.close();
expectLastCall().times(2);
replayAll();
List<InputRow> rows = Lists.newArrayList();
try (Firehose firehose = replayableFirehoseFactory.connect(parser)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
Assert.assertEquals(testRows, rows);
verifyAll();
}
@Test(expected = TestReadingException.class)
public void testReplayableFirehoseWithNoRetries() throws Exception
{
replayableFirehoseFactory = new ReplayableFirehoseFactory(
delegateFactory,
false,
10000,
0,
mapper
);
expect(delegateFactory.connect(parser)).andReturn(delegateFirehose);
expect(delegateFirehose.hasMore()).andReturn(true).times(2);
expect(delegateFirehose.nextRow()).andThrow(new TestReadingException());
delegateFirehose.close();
expectLastCall();
replayAll();
replayableFirehoseFactory.connect(parser);
verifyAll();
}
@Test
public void testReplayableFirehoseWithMultipleFiles() throws Exception
{
replayableFirehoseFactory = new ReplayableFirehoseFactory(delegateFactory, false, 1, 3, mapper);
final boolean hasMore[] = {true};
final int multiplicationFactor = 500;
final InputRow finalRow = new MapBasedInputRow(
DateTime.now(), Lists.newArrayList("dim4", "dim5"),
ImmutableMap.<String, Object>of("dim4", "val12", "dim5", "val20", "met1", 30)
);
expect(delegateFactory.connect(parser)).andReturn(delegateFirehose);
expect(delegateFirehose.hasMore()).andAnswer(
new IAnswer<Boolean>()
{
@Override
public Boolean answer() throws Throwable
{
return hasMore[0];
}
}
).anyTimes();
expect(delegateFirehose.nextRow())
.andReturn(testRows.get(0)).times(multiplicationFactor)
.andReturn(testRows.get(1)).times(multiplicationFactor)
.andReturn(testRows.get(2)).times(multiplicationFactor)
.andAnswer(
new IAnswer<InputRow>()
{
@Override
public InputRow answer() throws Throwable
{
hasMore[0] = false;
return finalRow;
}
}
);
delegateFirehose.close();
replayAll();
List<InputRow> testRowsMultiplied = Lists.newArrayList();
for (InputRow row : testRows) {
for (int i = 0; i < multiplicationFactor; i++) {
testRowsMultiplied.add(row);
}
}
testRowsMultiplied.add(finalRow);
List<InputRow> rows = Lists.newArrayList();
try (Firehose firehose = replayableFirehoseFactory.connect(parser)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
Assert.assertEquals(testRowsMultiplied, rows);
// now replay!
rows.clear();
try (Firehose firehose = replayableFirehoseFactory.connect(parser)) {
while (firehose.hasMore()) {
rows.add(firehose.nextRow());
}
}
Assert.assertEquals(testRowsMultiplied, rows);
verifyAll();
}
private class TestReadingException extends RuntimeException
{
}
}