fix passing of config from IndexTuningConfig to RealtimeTuningConfig

- pass rowFlushboundary correctly instead of using default.
- fixes indexTask failing with
io.druid.segment.incremental.IndexSizeExceededException when
rowFlushboundary is set higher than
RealtimeTuningConfig.defaultMaxRowsInMemory

rename test method
This commit is contained in:
nishant 2015-06-10 19:21:39 +05:30
parent 6ae4ecc7d4
commit 191b302f6a
2 changed files with 92 additions and 68 deletions

View File

@ -118,6 +118,24 @@ public class IndexTask extends AbstractFixedIntervalTask
);
}
static RealtimeTuningConfig convertTuningConfig(ShardSpec spec, IndexTuningConfig config)
{
return new RealtimeTuningConfig(
config.getRowFlushBoundary(),
null,
null,
null,
null,
null,
null,
spec,
config.getIndexSpec(),
null,
null,
null
);
}
@JsonIgnore
private final IndexIngestionSpec ingestionSchema;
@ -337,20 +355,7 @@ public class IndexTask extends AbstractFixedIntervalTask
tmpDir
).findPlumber(
schema,
new RealtimeTuningConfig(
null,
null,
null,
null,
null,
null,
null,
shardSpec,
ingestionSchema.getTuningConfig().getIndexSpec(),
null,
null,
null
),
convertTuningConfig(shardSpec, ingestionSchema.getTuningConfig()),
metrics
);

View File

@ -37,11 +37,14 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -234,64 +237,80 @@ public class IndexTaskTest
}
@Test
public void testIntervalBucketing() throws Exception
{
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
public void testIntervalBucketing() throws Exception
{
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
tmpFile.deleteOnExit();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
tmpFile.deleteOnExit();
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2015-03-01T07:59:59.977Z,a,1");
writer.println("2015-03-01T08:00:00.000Z,b,1");
writer.close();
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2015-03-01T07:59:59.977Z,a,1");
writer.println("2015-03-01T08:00:00.000Z,b,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularity.HOUR,
QueryGranularity.HOUR,
Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
)
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
null
),
new DefaultObjectMapper()
);
IndexTask indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto",
null
),
new DimensionsSpec(
Arrays.asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularity.HOUR,
QueryGranularity.HOUR,
Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
)
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
null
),
new DefaultObjectMapper()
);
final List<DataSegment> segments = runTask(indexTask);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
}
Assert.assertEquals(1, segments.size());
}
@Test
public void testConvertProps()
{
ShardSpec spec = new NumberedShardSpec(1, 2);
IndexTask.IndexTuningConfig config = new IndexTask.IndexTuningConfig(
100,
1000,
null,
new IndexSpec()
);
RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(spec, config);
Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary());
Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec);
Assert.assertEquals(realtimeTuningConfig.getIndexSpec(), indexSpec);
}
}