unit tests passing after more refactoring

This commit is contained in:
fjy 2014-02-27 15:21:09 -08:00
parent 1afcc71227
commit bf2ddda897
22 changed files with 631 additions and 506 deletions

View File

@ -48,7 +48,7 @@ import java.util.Map;
/**
*/
public class HadoopIngestionSchema implements IngestionSchema
public class HadoopIngestionSchema extends IngestionSchema<HadoopIOConfig, HadoopDriverConfig>
{
public static HadoopIngestionSchema convertLegacy(
String dataSource,
@ -138,6 +138,8 @@ public class HadoopIngestionSchema implements IngestionSchema
final @JsonProperty("targetPartitionSize") Long targetPartitionSize
)
{
super(dataSchema, ioConfig, driverConfig);
if (dataSchema != null) {
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;

View File

@ -1,92 +0,0 @@
package io.druid.indexer.schema;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.segment.indexing.DriverConfig;
/**
*/
@JsonTypeName("batch")
public class BatchDriverConfig implements DriverConfig
{
private final String workingPath;
private final String segmentOutputPath;
private final String version;
private final PartitionsSpec partitionsSpec;
private final boolean leaveIntermediate;
private final boolean cleanupOnFailure;
private final boolean overwriteFiles;
private final boolean ignoreInvalidRows;
@JsonCreator
public BatchDriverConfig(
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("segmentOutputPath") String segmentOutputPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows
)
{
this.workingPath = workingPath;
this.segmentOutputPath = segmentOutputPath;
this.version = version;
this.partitionsSpec = partitionsSpec;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = (cleanupOnFailure == null) ? true : cleanupOnFailure;
this.overwriteFiles = overwriteFiles;
this.ignoreInvalidRows = ignoreInvalidRows;
}
@JsonProperty
public String getWorkingPath()
{
return workingPath;
}
@JsonProperty
public String getSegmentOutputPath()
{
return segmentOutputPath;
}
@JsonProperty
public String getVersion()
{
return version;
}
@JsonProperty
public PartitionsSpec getPartitionsSpec()
{
return partitionsSpec;
}
@JsonProperty
public boolean isLeaveIntermediate()
{
return leaveIntermediate;
}
@JsonProperty
public boolean isCleanupOnFailure()
{
return cleanupOnFailure;
}
@JsonProperty
public boolean isOverwriteFiles()
{
return overwriteFiles;
}
@JsonProperty
public boolean isIgnoreInvalidRows()
{
return ignoreInvalidRows;
}
}

View File

@ -1,39 +0,0 @@
package io.druid.indexer.schema;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexer.path.PathSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.segment.indexing.IOConfig;
/**
*/
@JsonTypeName("batch")
public class BatchIOConfig implements IOConfig
{
private final PathSpec pathSpec;
private final DbUpdaterJobSpec updaterJobSpec;
@JsonCreator
public BatchIOConfig(
@JsonProperty("pathSpec") PathSpec pathSpec,
@JsonProperty("updaterJobSpec") DbUpdaterJobSpec updaterJobSpec
)
{
this.pathSpec = pathSpec;
this.updaterJobSpec = updaterJobSpec;
}
@JsonProperty
public PathSpec getPathSpec()
{
return pathSpec;
}
@JsonProperty
public DbUpdaterJobSpec getUpdaterJobSpec()
{
return updaterJobSpec;
}
}

View File

@ -22,7 +22,6 @@ package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -45,8 +44,10 @@ import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.DriverConfig;
import io.druid.segment.indexing.IOConfig;
import io.druid.segment.indexing.IngestionSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
@ -58,7 +59,6 @@ import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.io.File;
import java.io.IOException;
@ -72,59 +72,77 @@ public class IndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(IndexTask.class);
@JsonIgnore
private final GranularitySpec granularitySpec;
private static String makeId(String id, IndexIngestionSchema ingestionSchema, String dataSource)
{
if (id == null) {
return String.format("index_%s_%s", makeDataSource(ingestionSchema, dataSource), new DateTime().toString());
}
@JsonProperty
private final List<SpatialDimensionSchema> spatialDimensions;
return id;
}
private static String makeDataSource(IndexIngestionSchema ingestionSchema, String dataSource)
{
if (ingestionSchema != null) {
return ingestionSchema.getDataSchema().getDataSource();
} else { // Backwards compatible
return dataSource;
}
}
private static Interval makeInterval(IndexIngestionSchema ingestionSchema, GranularitySpec granularitySpec)
{
GranularitySpec spec;
if (ingestionSchema != null) {
spec = ingestionSchema.getDataSchema().getGranularitySpec();
} else {
spec = granularitySpec;
}
return new Interval(
spec.bucketIntervals().get().first().getStart(),
spec.bucketIntervals().get().last().getEnd()
);
}
@JsonIgnore
private final AggregatorFactory[] aggregators;
@JsonIgnore
private final QueryGranularity indexGranularity;
@JsonIgnore
private final int targetPartitionSize;
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@JsonIgnore
private final int rowFlushBoundary;
private final IndexIngestionSchema ingestionSchema;
@JsonCreator
public IndexTask(
@JsonProperty("id") String id,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions,
@JsonProperty("aggregators") AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
@JsonProperty("config") IndexIngestionSchema ingestionSchema,
// Backwards Compatible
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("granularitySpec") final GranularitySpec granularitySpec,
@JsonProperty("aggregators") final AggregatorFactory[] aggregators,
@JsonProperty("indexGranularity") final QueryGranularity indexGranularity,
@JsonProperty("targetPartitionSize") final int targetPartitionSize,
@JsonProperty("firehose") final FirehoseFactory firehoseFactory,
@JsonProperty("rowFlushBoundary") final int rowFlushBoundary
)
{
super(
// _not_ the version, just something uniqueish
id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()),
dataSource,
new Interval(
granularitySpec.bucketIntervals().get().first().getStart(),
granularitySpec.bucketIntervals().get().last().getEnd()
)
makeId(id, ingestionSchema, dataSource),
makeDataSource(ingestionSchema, dataSource),
makeInterval(ingestionSchema, granularitySpec)
);
this.granularitySpec = Preconditions.checkNotNull(granularitySpec, "granularitySpec");
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
this.aggregators = aggregators;
this.indexGranularity = (indexGranularity == null) ? QueryGranularity.NONE : indexGranularity;
this.targetPartitionSize = targetPartitionSize;
this.firehoseFactory = Preconditions.checkNotNull(firehoseFactory, "firehoseFactory");
this.rowFlushBoundary = rowFlushBoundary;
if (ingestionSchema != null) {
this.ingestionSchema = ingestionSchema;
} else { // Backwards Compatible
this.ingestionSchema = new IndexIngestionSchema(
new DataSchema(
dataSource,
firehoseFactory.getParser(),
aggregators,
granularitySpec.withQueryGranularity(indexGranularity)
),
new IndexIOConfig(firehoseFactory),
new IndexDriverConfig(targetPartitionSize, rowFlushBoundary)
);
}
}
@Override
@ -133,9 +151,18 @@ public class IndexTask extends AbstractFixedIntervalTask
return "index";
}
@JsonProperty("config")
public IndexIngestionSchema getIngestionSchema()
{
return ingestionSchema;
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
final int targetPartitionSize = ingestionSchema.getDriverConfig().getTargetPartitionSize();
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet();
@ -154,12 +181,7 @@ public class IndexTask extends AbstractFixedIntervalTask
for (final ShardSpec shardSpec : shardSpecs) {
final DataSegment segment = generateSegment(
toolbox,
new DataSchema(
getDataSource(),
firehoseFactory.getParser(),
aggregators,
granularitySpec.withQueryGranularity(indexGranularity)
),
ingestionSchema.getDataSchema(),
shardSpec,
bucket,
myLock.getVersion()
@ -173,6 +195,9 @@ public class IndexTask extends AbstractFixedIntervalTask
private SortedSet<Interval> getDataIntervals() throws IOException
{
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
SortedSet<Interval> retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
try (Firehose firehose = firehoseFactory.connect(firehoseFactory.getParser())) {
while (firehose.hasMore()) {
@ -192,6 +217,8 @@ public class IndexTask extends AbstractFixedIntervalTask
{
log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize);
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
// The implementation of this determine partitions stuff is less than optimal. Should be done better.
// Blacklist dimensions that have multiple values per row
@ -327,6 +354,9 @@ public class IndexTask extends AbstractFixedIntervalTask
)
);
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
final int rowFlushBoundary = ingestionSchema.getDriverConfig().getRowFlushBoundary();
// We need to track published segments.
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
@ -354,10 +384,10 @@ public class IndexTask extends AbstractFixedIntervalTask
version,
wrappedDataSegmentPusher,
tmpDir
).findPlumber(schema, new RealtimeDriverConfig(0, new Period(), shardSpec), metrics);
).findPlumber(schema, new RealtimeDriverConfig(null, null, null, null, null, null, null, shardSpec), metrics);
// rowFlushBoundary for this job
final int myRowFlushBoundary = this.rowFlushBoundary > 0
final int myRowFlushBoundary = rowFlushBoundary > 0
? rowFlushBoundary
: toolbox.getConfig().getDefaultRowFlushBoundary();
@ -433,45 +463,92 @@ public class IndexTask extends AbstractFixedIntervalTask
return interval.contains(inputRow.getTimestampFromEpoch()) && shardSpec.isInChunk(inputRow);
}
@JsonProperty
public GranularitySpec getGranularitySpec()
public static class IndexIngestionSchema extends IngestionSchema<IndexIOConfig, IndexDriverConfig>
{
return granularitySpec;
private final DataSchema dataSchema;
private final IndexIOConfig ioConfig;
private final IndexDriverConfig driverConfig;
@JsonCreator
public IndexIngestionSchema(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") IndexIOConfig ioConfig,
@JsonProperty("driverConfig") IndexDriverConfig driverConfig
)
{
super(dataSchema, ioConfig, driverConfig);
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig;
}
@Override
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
{
return dataSchema;
}
@Override
@JsonProperty("ioConfig")
public IndexIOConfig getIOConfig()
{
return ioConfig;
}
@Override
@JsonProperty("driverConfig")
public IndexDriverConfig getDriverConfig()
{
return driverConfig;
}
}
@JsonProperty
public AggregatorFactory[] getAggregators()
public static class IndexIOConfig implements IOConfig
{
return aggregators;
private final FirehoseFactory firehoseFactory;
@JsonCreator
public IndexIOConfig(
@JsonProperty("firehose") FirehoseFactory firehoseFactory
)
{
this.firehoseFactory = firehoseFactory;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
}
@JsonProperty
public QueryGranularity getIndexGranularity()
public static class IndexDriverConfig implements DriverConfig
{
return indexGranularity;
}
private final int targetPartitionSize;
private final int rowFlushBoundary;
@JsonProperty
public long getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonCreator
public IndexDriverConfig(
@JsonProperty("targetPartitionSize") int targetPartitionSize,
@JsonProperty("rowFlushBoundary") int rowFlushBoundary
)
{
this.targetPartitionSize = targetPartitionSize;
this.rowFlushBoundary = rowFlushBoundary;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty
public int getTargetPartitionSize()
{
return targetPartitionSize;
}
@JsonProperty
public int getRowFlushBoundary()
{
return rowFlushBoundary;
}
@JsonProperty
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
@JsonProperty
public int getRowFlushBoundary()
{
return rowFlushBoundary;
}
}
}

View File

@ -79,25 +79,7 @@ public class RealtimeIndexTask extends AbstractTask
}
@JsonIgnore
private final Schema schema;
@JsonIgnore
private final FirehoseFactory firehoseFactory;
@JsonIgnore
private final FireDepartmentConfig fireDepartmentConfig;
@JsonIgnore
private final Period windowPeriod;
@JsonIgnore
private final int maxPendingPersists;
@JsonIgnore
private final Granularity segmentGranularity;
@JsonIgnore
private final RejectionPolicyFactory rejectionPolicyFactory;
private final FireDepartment schema;
@JsonIgnore
private volatile Plumber plumber = null;
@ -109,6 +91,8 @@ public class RealtimeIndexTask extends AbstractTask
public RealtimeIndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("config") FireDepartment fireDepartment,
// To be deprecated
@JsonProperty("schema") Schema schema,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@ -120,34 +104,67 @@ public class RealtimeIndexTask extends AbstractTask
{
super(
id == null
? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString())
? makeTaskId(
fireDepartment == null
? schema.getDataSource()
: fireDepartment.getDataSchema().getDataSource(),
fireDepartment == null
? schema.getShardSpec().getPartitionNum()
: fireDepartment.getDriverConfig().getShardSpec().getPartitionNum(),
new DateTime().toString()
)
: id,
String.format(
"index_realtime_%s",
schema.getDataSource()
fireDepartment == null
? schema.getDataSource()
: fireDepartment.getDataSchema().getDataSource()
),
taskResource == null
? new TaskResource(
makeTaskId(
schema.getDataSource(),
schema.getShardSpec().getPartitionNum(),
fireDepartment == null
? schema.getDataSource()
: fireDepartment.getDataSchema().getDataSource(),
fireDepartment == null
? schema.getShardSpec().getPartitionNum()
: fireDepartment.getDriverConfig().getShardSpec().getPartitionNum(),
new DateTime().toString()
), 1
)
: taskResource,
schema.getDataSource()
(fireDepartment != null)
? fireDepartment.getDataSchema().getDataSource()
: schema.getDataSource()
);
this.schema = schema;
this.firehoseFactory = firehoseFactory;
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
this.maxPendingPersists = (maxPendingPersists == 0)
? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS
: maxPendingPersists;
this.segmentGranularity = segmentGranularity;
this.rejectionPolicyFactory = rejectionPolicyFactory;
if (fireDepartment != null) {
this.schema = fireDepartment;
} else {
this.schema = new FireDepartment(
new DataSchema(
schema.getDataSource(),
firehoseFactory == null ? null : firehoseFactory.getParser(),
schema.getAggregators(),
new UniformGranularitySpec(segmentGranularity, schema.getIndexGranularity(), null, segmentGranularity)
),
new RealtimeIOConfig(firehoseFactory, null),
new RealtimeDriverConfig(
fireDepartmentConfig == null ? null : fireDepartmentConfig.getMaxRowsInMemory(),
fireDepartmentConfig == null ? null : fireDepartmentConfig.getIntermediatePersistPeriod(),
windowPeriod,
null,
null,
rejectionPolicyFactory,
maxPendingPersists,
schema.getShardSpec()
),
null, null, null, null
);
}
}
@Override
@ -197,18 +214,16 @@ public class RealtimeIndexTask extends AbstractTask
boolean normalExit = true;
// Set up firehose
final Period intermediatePersistPeriod = fireDepartmentConfig.getIntermediatePersistPeriod();
final Firehose firehose = firehoseFactory.connect(firehoseFactory.getParser());
final Period intermediatePersistPeriod = schema.getDriverConfig().getIntermediatePersistPeriod();
final Firehose firehose = schema.getIOConfig().getFirehoseFactory().connect(schema.getDataSchema().getParser());
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
// stuff like the ServerView, which seems kind of odd? Perhaps revisit this when Guice has been introduced.
final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
windowPeriod,
final RealtimePlumberSchool plumberSchool = new RealtimePlumberSchool(
schema.getDriverConfig().getWindowPeriod(),
new File(toolbox.getTaskWorkDir(), "persist"),
segmentGranularity
schema.getDataSchema().getGranularitySpec().getSegmentGranularity()
);
realtimePlumberSchool.setDefaultMaxPendingPersists(maxPendingPersists);
final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox);
// NOTE: We talk to the coordinator in various places in the plumber and we could be more robust to issues
@ -289,36 +304,17 @@ public class RealtimeIndexTask extends AbstractTask
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and
// NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the
// NOTE: "same" segment.
realtimePlumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher());
realtimePlumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate());
realtimePlumberSchool.setQueryExecutorService(toolbox.getQueryExecutorService());
realtimePlumberSchool.setVersioningPolicy(versioningPolicy);
realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView());
realtimePlumberSchool.setEmitter(toolbox.getEmitter());
plumberSchool.setDataSegmentPusher(toolbox.getSegmentPusher());
plumberSchool.setConglomerate(toolbox.getQueryRunnerFactoryConglomerate());
plumberSchool.setQueryExecutorService(toolbox.getQueryExecutorService());
plumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer);
plumberSchool.setSegmentPublisher(segmentPublisher);
plumberSchool.setServerView(toolbox.getNewSegmentServerView());
plumberSchool.setEmitter(toolbox.getEmitter());
if (this.rejectionPolicyFactory != null) {
realtimePlumberSchool.setRejectionPolicyFactory(rejectionPolicyFactory);
}
DataSchema dataSchema = new DataSchema(
schema.getDataSource(),
firehoseFactory.getParser(),
schema.getAggregators(),
new UniformGranularitySpec(
realtimePlumberSchool.getSegmentGranularity(),
schema.getIndexGranularity(),
null,
realtimePlumberSchool.getSegmentGranularity()
)
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(firehoseFactory, realtimePlumberSchool);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(
fireDepartmentConfig.getMaxRowsInMemory(),
fireDepartmentConfig.getIntermediatePersistPeriod(),
schema.getShardSpec()
);
DataSchema dataSchema = schema.getDataSchema();
RealtimeIOConfig realtimeIOConfig = schema.getIOConfig();
RealtimeDriverConfig driverConfig = schema.getDriverConfig().withVersioningPolicy(versioningPolicy);
final FireDepartment fireDepartment = new FireDepartment(
dataSchema,
@ -331,7 +327,7 @@ public class RealtimeIndexTask extends AbstractTask
);
final RealtimeMetricsMonitor metricsMonitor = new RealtimeMetricsMonitor(ImmutableList.of(fireDepartment));
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();
this.plumber = realtimePlumberSchool.findPlumber(dataSchema, driverConfig, fireDepartment.getMetrics());
this.plumber = plumberSchool.findPlumber(dataSchema, driverConfig, fireDepartment.getMetrics());
try {
plumber.startJob();
@ -368,7 +364,7 @@ public class RealtimeIndexTask extends AbstractTask
int currCount = sink.add(inputRow);
fireDepartment.getMetrics().incrementProcessed();
if (currCount >= fireDepartmentConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
if (currCount >= driverConfig.getMaxRowsInMemory() || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
@ -381,7 +377,7 @@ public class RealtimeIndexTask extends AbstractTask
}
catch (Throwable e) {
normalExit = false;
log.makeAlert(e, "Exception aborted realtime processing[%s]", schema.getDataSource())
log.makeAlert(e, "Exception aborted realtime processing[%s]", dataSchema.getDataSource())
.emit();
throw e;
}
@ -404,42 +400,12 @@ public class RealtimeIndexTask extends AbstractTask
return TaskStatus.success(getId());
}
@JsonProperty
public Schema getSchema()
@JsonProperty("config")
public FireDepartment getRealtimeIngestionSchema()
{
return schema;
}
@JsonProperty("firehose")
public FirehoseFactory getFirehoseFactory()
{
return firehoseFactory;
}
@JsonProperty
public FireDepartmentConfig getFireDepartmentConfig()
{
return fireDepartmentConfig;
}
@JsonProperty
public Period getWindowPeriod()
{
return windowPeriod;
}
@JsonProperty
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
@JsonProperty("rejectionPolicy")
public RejectionPolicyFactory getRejectionPolicyFactory()
{
return rejectionPolicyFactory;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;

View File

@ -47,6 +47,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
super(
id,
taskResource,
null,
new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()),
null,
null,

View File

@ -29,12 +29,12 @@ import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.FirehoseModule;
import io.druid.indexer.HadoopIngestionSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.rollup.DataRollupSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.Schema;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.timeline.DataSegment;
@ -52,6 +52,7 @@ public class TaskSerdeTest
public void testIndexTaskSerde() throws Exception
{
final IndexTask task = new IndexTask(
null,
null,
"foo",
new UniformGranularitySpec(
@ -60,7 +61,6 @@ public class TaskSerdeTest
ImmutableList.of(new Interval("2010-01-01/P2D")),
Granularity.DAY
),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
@ -84,8 +84,8 @@ public class TaskSerdeTest
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getInterval(), task2.getInterval());
Assert.assertTrue(task.getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task2.getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
}
@Test
@ -198,6 +198,7 @@ public class TaskSerdeTest
final RealtimeIndexTask task = new RealtimeIndexTask(
null,
new TaskResource("rofl", 2),
null,
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,
@ -216,16 +217,29 @@ public class TaskSerdeTest
Assert.assertEquals("foo", task.getDataSource());
Assert.assertEquals(2, task.getTaskResource().getRequiredCapacity());
Assert.assertEquals("rofl", task.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(new Period("PT10M"), task.getWindowPeriod());
Assert.assertEquals(Granularity.HOUR, task.getSegmentGranularity());
Assert.assertEquals(
new Period("PT10M"),
task.getRealtimeIngestionSchema()
.getDriverConfig().getWindowPeriod()
);
Assert.assertEquals(
Granularity.HOUR,
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(task.getId(), task2.getId());
Assert.assertEquals(task.getGroupId(), task2.getGroupId());
Assert.assertEquals(task.getDataSource(), task2.getDataSource());
Assert.assertEquals(task.getTaskResource().getRequiredCapacity(), task2.getTaskResource().getRequiredCapacity());
Assert.assertEquals(task.getTaskResource().getAvailabilityGroup(), task2.getTaskResource().getAvailabilityGroup());
Assert.assertEquals(task.getWindowPeriod(), task2.getWindowPeriod());
Assert.assertEquals(task.getSegmentGranularity(), task2.getSegmentGranularity());
Assert.assertEquals(
task.getRealtimeIngestionSchema().getDriverConfig().getWindowPeriod(),
task2.getRealtimeIngestionSchema().getDriverConfig().getWindowPeriod()
);
Assert.assertEquals(
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(),
task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
);
}
@Test

View File

@ -222,6 +222,7 @@ public class TaskLifecycleTest
public void testIndexTask() throws Exception
{
final Task indexTask = new IndexTask(
null,
null,
"foo",
new UniformGranularitySpec(
@ -230,7 +231,6 @@ public class TaskLifecycleTest
ImmutableList.of(new Interval("2010-01-01/P2D")),
Granularity.DAY
),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,
@ -282,10 +282,10 @@ public class TaskLifecycleTest
public void testIndexTaskFailure() throws Exception
{
final Task indexTask = new IndexTask(
null,
null,
"foo",
new UniformGranularitySpec(Granularity.DAY, null, ImmutableList.of(new Interval("2010-01-01/P1D")), Granularity.DAY),
null,
new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
QueryGranularity.NONE,
10000,

View File

@ -42,6 +42,7 @@ public class TaskAnnouncementTest
final Task task = new RealtimeIndexTask(
"theid",
new TaskResource("rofl", 2),
null,
new Schema("foo", null, new AggregatorFactory[0], QueryGranularity.NONE, new NoneShardSpec()),
null,
null,

View File

@ -19,13 +19,44 @@
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public interface IngestionSchema
public abstract class IngestionSchema<IOConfigType extends IOConfig, DriverConfigType extends DriverConfig>
{
public DataSchema getDataSchema();
private final DataSchema dataSchema;
private final IOConfigType ioConfig;
private final DriverConfigType driverConfig;
public IOConfig getIOConfig();
@JsonCreator
public IngestionSchema(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("ioConfig") IOConfigType ioConfig,
@JsonProperty("driverConfig") DriverConfigType driverConfig
)
{
this.dataSchema = dataSchema;
this.ioConfig = ioConfig;
this.driverConfig = driverConfig;
}
public DriverConfig getDriverConfig();
@JsonProperty("dataSchema")
public DataSchema getDataSchema()
{
return dataSchema;
}
@JsonProperty("ioConfig")
public IOConfigType getIOConfig()
{
return ioConfig;
}
@JsonProperty("driverConfig")
public DriverConfigType getDriverConfig()
{
return driverConfig;
}
}

View File

@ -21,27 +21,53 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.RejectionPolicyFactory;
import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.ShardSpec;
import org.joda.time.Period;
import java.io.File;
/**
*/
public class RealtimeDriverConfig implements DriverConfig
{
private final int maxRowsInMemory;
private final Period intermediatePersistPeriod;
private final Period windowPeriod;
private final File basePersistDirectory;
private final VersioningPolicy versioningPolicy;
private final RejectionPolicyFactory rejectionPolicy;
private final int maxPendingPersists;
private final ShardSpec shardSpec;
@JsonCreator
public RealtimeDriverConfig(
@JsonProperty("maxRowsInMemory") int maxRowsInMemory,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
@JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicy,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("shardSpec") ShardSpec shardSpec
)
{
this.maxRowsInMemory = maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod;
this.maxRowsInMemory = maxRowsInMemory == null ? 500000 : maxRowsInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? new Period("PT10M")
: intermediatePersistPeriod;
this.windowPeriod = windowPeriod == null ? this.intermediatePersistPeriod : windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.versioningPolicy = versioningPolicy == null ? new IntervalStartVersioningPolicy() : versioningPolicy;
this.rejectionPolicy = rejectionPolicy == null ? new ServerTimeRejectionPolicyFactory() : rejectionPolicy;
this.maxPendingPersists = maxPendingPersists == null
? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS
: maxPendingPersists;
this.shardSpec = shardSpec == null ? new NoneShardSpec() : shardSpec;
}
@ -57,9 +83,53 @@ public class RealtimeDriverConfig implements DriverConfig
return intermediatePersistPeriod;
}
@JsonProperty
public Period getWindowPeriod()
{
return windowPeriod;
}
@JsonProperty
public File getBasePersistDirectory()
{
return basePersistDirectory;
}
@JsonProperty
public VersioningPolicy getVersioningPolicy()
{
return versioningPolicy;
}
@JsonProperty
public RejectionPolicyFactory getRejectionPolicyFactory()
{
return rejectionPolicy;
}
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
@JsonProperty
public ShardSpec getShardSpec()
{
return shardSpec;
}
public RealtimeDriverConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeDriverConfig(
maxRowsInMemory,
intermediatePersistPeriod,
windowPeriod,
basePersistDirectory,
policy,
rejectionPolicy,
maxPendingPersists,
shardSpec
);
}
}

View File

@ -28,7 +28,6 @@ public class RealtimeIOConfig implements IOConfig
return firehoseFactory;
}
@JsonProperty("plumber")
public PlumberSchool getPlumberSchool()
{
return plumberSchool;

View File

@ -32,6 +32,7 @@ import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import org.joda.time.Interval;
import java.io.IOException;
@ -43,7 +44,7 @@ import java.io.IOException;
* realtime stream of data. The Plumber directs each drop of water from the firehose into the correct sink and makes
* sure that the sinks don't overflow.
*/
public class FireDepartment implements IngestionSchema
public class FireDepartment extends IngestionSchema<RealtimeIOConfig, RealtimeDriverConfig>
{
private final DataSchema dataSchema;
private final RealtimeIOConfig ioConfig;
@ -63,6 +64,8 @@ public class FireDepartment implements IngestionSchema
@JsonProperty("plumber") PlumberSchool plumberSchool
)
{
super(dataSchema, ioConfig, driverConfig);
// Backwards compatibility
if (dataSchema == null) {
Preconditions.checkNotNull(schema, "schema");
@ -88,6 +91,11 @@ public class FireDepartment implements IngestionSchema
this.driverConfig = new RealtimeDriverConfig(
config.getMaxRowsInMemory(),
config.getIntermediatePersistPeriod(),
((RealtimePlumberSchool) plumberSchool).getWindowPeriod(),
((RealtimePlumberSchool) plumberSchool).getBasePersistDirectory(),
((RealtimePlumberSchool) plumberSchool).getVersioningPolicy(),
((RealtimePlumberSchool) plumberSchool).getRejectionPolicyFactory(),
((RealtimePlumberSchool) plumberSchool).getMaxPendingPersists(),
schema.getShardSpec()
);
} else {

View File

@ -14,9 +14,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.server.coordination.DataSegmentAnnouncer;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@ -30,6 +28,8 @@ public class FlushingPlumber extends RealtimePlumber
{
private static final EmittingLogger log = new EmittingLogger(FlushingPlumber.class);
private final DataSchema schema;
private final RealtimeDriverConfig config;
private final Duration flushDuration;
private volatile ScheduledExecutorService flushScheduledExec = null;
@ -37,41 +37,31 @@ public class FlushingPlumber extends RealtimePlumber
public FlushingPlumber(
Duration flushDuration,
Period windowPeriod,
File basePersistDirectory,
Granularity segmentGranularity,
DataSchema schema,
RealtimeDriverConfig config,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
VersioningPolicy versioningPolicy,
int maxPendingPersists
ExecutorService queryExecutorService
)
{
super(
windowPeriod,
basePersistDirectory,
segmentGranularity,
schema,
config,
metrics,
rejectionPolicy,
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
versioningPolicy,
null,
null,
null,
maxPendingPersists
null
);
this.flushDuration = flushDuration;
this.schema = schema;
this.config = config;
}
@Override
@ -122,15 +112,16 @@ public class FlushingPlumber extends RealtimePlumber
private void startFlushThread()
{
final DateTime truncatedNow = getSegmentGranularity().truncate(new DateTime());
final long windowMillis = getWindowPeriod().toStandardDuration().getMillis();
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
final DateTime truncatedNow = segmentGranularity.truncate(new DateTime());
final long windowMillis = config.getWindowPeriod().toStandardDuration().getMillis();
log.info(
"Expect to run at [%s]",
new DateTime().plus(
new Duration(
System.currentTimeMillis(),
getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis
schema.getGranularitySpec().getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis
)
)
);
@ -140,9 +131,9 @@ public class FlushingPlumber extends RealtimePlumber
flushScheduledExec,
new Duration(
System.currentTimeMillis(),
getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis
schema.getGranularitySpec().getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis
),
new Duration(truncatedNow, getSegmentGranularity().increment(truncatedNow)),
new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)),
new ThreadRenamingCallable<ScheduledExecutors.Signal>(
String.format(
"%s-flusher-%d",
@ -159,7 +150,7 @@ public class FlushingPlumber extends RealtimePlumber
return ScheduledExecutors.Signal.STOP;
}
long minTimestamp = getSegmentGranularity().truncate(
long minTimestamp = segmentGranularity.truncate(
getRejectionPolicy().getCurrMaxTime().minus(windowMillis)
).getMillis();

View File

@ -43,15 +43,11 @@ import java.util.concurrent.ExecutorService;
* This plumber just drops segments at the end of a flush duration instead of handing them off. It is only useful if you want to run
* a real time node without the rest of the Druid cluster.
*/
public class FlushingPlumberSchool implements PlumberSchool
public class FlushingPlumberSchool extends RealtimePlumberSchool
{
private static final EmittingLogger log = new EmittingLogger(FlushingPlumberSchool.class);
private final Duration flushDuration;
private final Period windowPeriod;
private final File basePersistDirectory;
private final Granularity segmentGranularity;
private final int maxPendingPersists;
@JacksonInject
@NotNull
@ -70,9 +66,6 @@ public class FlushingPlumberSchool implements PlumberSchool
@Processing
private volatile ExecutorService queryExecutorService = null;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@JsonCreator
public FlushingPlumberSchool(
@JsonProperty("flushDuration") Duration flushDuration,
@ -81,19 +74,9 @@ public class FlushingPlumberSchool implements PlumberSchool
@JsonProperty("segmentGranularity") Granularity segmentGranularity
)
{
this.flushDuration = flushDuration;
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
// Workaround for Jackson issue where if maxPendingPersists is null, all JacksonInjects fail
this.maxPendingPersists = RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS;
super(windowPeriod, basePersistDirectory, segmentGranularity);
Preconditions.checkNotNull(flushDuration, "FlushingPlumberSchool requires a flushDuration.");
Preconditions.checkNotNull(windowPeriod, "FlushingPlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "FlushingPlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "FlushingPlumberSchool requires a segmentGranularity.");
this.flushDuration = flushDuration;
}
@Override
@ -105,33 +88,18 @@ public class FlushingPlumberSchool implements PlumberSchool
{
verifyState();
final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod);
log.info("Creating plumber using rejectionPolicy[%s]", rejectionPolicy);
return new FlushingPlumber(
flushDuration,
windowPeriod,
basePersistDirectory,
segmentGranularity,
schema,
config,
metrics,
rejectionPolicy,
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
versioningPolicy,
maxPendingPersists
queryExecutorService
);
}
@Override
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
private void verifyState()
{
Preconditions.checkNotNull(conglomerate, "must specify a queryRunnerFactoryConglomerate to do this action.");

View File

@ -26,9 +26,11 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
import java.io.File;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RealtimePlumberSchool.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "realtime", value = RealtimePlumberSchool.class),
@JsonSubTypes.Type(name = "flushing", value = FlushingPlumberSchool.class)
@ -42,5 +44,6 @@ public interface PlumberSchool
*/
public Plumber findPlumber(DataSchema schema, RealtimeDriverConfig config, FireDepartmentMetrics metrics);
@Deprecated
public Granularity getSegmentGranularity();
}

View File

@ -70,22 +70,17 @@ public class RealtimePlumber implements Plumber
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final Period windowPeriod;
private final File basePersistDirectory;
private final Granularity segmentGranularity;
private final DataSchema schema;
private final RealtimeDriverConfig config;
private final FireDepartmentMetrics metrics;
private final RejectionPolicy rejectionPolicy;
private final FireDepartmentMetrics metrics;
private final ServiceEmitter emitter;
private final QueryRunnerFactoryConglomerate conglomerate;
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final VersioningPolicy versioningPolicy;
private final DataSegmentPusher dataSegmentPusher;
private final SegmentPublisher segmentPublisher;
private final ServerView serverView;
private final int maxPendingPersists;
private final Object handoffCondition = new Object();
private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
@ -99,40 +94,31 @@ public class RealtimePlumber implements Plumber
private volatile ScheduledExecutorService scheduledExecutor = null;
public RealtimePlumber(
Period windowPeriod,
File basePersistDirectory,
Granularity segmentGranularity,
DataSchema schema,
RealtimeDriverConfig config,
FireDepartmentMetrics metrics,
RejectionPolicy rejectionPolicy,
ServiceEmitter emitter,
QueryRunnerFactoryConglomerate conglomerate,
DataSegmentAnnouncer segmentAnnouncer,
ExecutorService queryExecutorService,
VersioningPolicy versioningPolicy,
DataSegmentPusher dataSegmentPusher,
SegmentPublisher segmentPublisher,
ServerView serverView,
int maxPendingPersists
ServerView serverView
)
{
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.schema = schema;
this.config = config;
this.rejectionPolicy = config.getRejectionPolicyFactory().create(config.getWindowPeriod());
this.metrics = metrics;
this.rejectionPolicy = rejectionPolicy;
this.emitter = emitter;
this.conglomerate = conglomerate;
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.versioningPolicy = versioningPolicy;
this.dataSegmentPusher = dataSegmentPusher;
this.segmentPublisher = segmentPublisher;
this.serverView = serverView;
this.maxPendingPersists = maxPendingPersists;
log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy());
}
public DataSchema getSchema()
@ -145,21 +131,6 @@ public class RealtimePlumber implements Plumber
return config;
}
public Period getWindowPeriod()
{
return windowPeriod;
}
public Granularity getSegmentGranularity()
{
return segmentGranularity;
}
public VersioningPolicy getVersioningPolicy()
{
return versioningPolicy;
}
public RejectionPolicy getRejectionPolicy()
{
return rejectionPolicy;
@ -187,6 +158,9 @@ public class RealtimePlumber implements Plumber
return null;
}
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
final VersioningPolicy versioningPolicy = config.getVersioningPolicy();
final long truncatedTime = segmentGranularity.truncate(new DateTime(timestamp)).getMillis();
Sink retVal = sinks.get(truncatedTime);
@ -434,6 +408,8 @@ public class RealtimePlumber implements Plumber
protected void initializeExecutors()
{
final int maxPendingPersists = config.getMaxPendingPersists();
if (persistExecutor == null) {
// use a blocking single threaded executor to throttle the firehose when write to disk is slow
persistExecutor = Execs.newBlockingSingleThreaded(
@ -462,6 +438,8 @@ public class RealtimePlumber implements Plumber
protected void bootstrapSinksFromDisk()
{
final VersioningPolicy versioningPolicy = config.getVersioningPolicy();
File baseDir = computeBaseDir(schema);
if (baseDir == null || !baseDir.exists()) {
return;
@ -554,6 +532,9 @@ public class RealtimePlumber implements Plumber
protected void startPersistThread()
{
final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity();
final Period windowPeriod = config.getWindowPeriod();
final DateTime truncatedNow = segmentGranularity.truncate(new DateTime());
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
@ -649,7 +630,7 @@ public class RealtimePlumber implements Plumber
protected File computeBaseDir(DataSchema schema)
{
return new File(basePersistDirectory, schema.getDataSource());
return new File(config.getBasePersistDirectory(), schema.getDataSource());
}
protected File computePersistDir(DataSchema schema, Interval interval)

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import com.metamx.common.Granularity;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -55,7 +56,7 @@ public class RealtimePlumberSchool implements PlumberSchool
@JacksonInject
@NotNull
private volatile ServiceEmitter emitter;
private volatile ServiceEmitter emitter = null;
@JacksonInject
@NotNull
@ -93,17 +94,44 @@ public class RealtimePlumberSchool implements PlumberSchool
@JsonProperty("segmentGranularity") Granularity segmentGranularity
)
{
this.windowPeriod = windowPeriod;
this.basePersistDirectory = basePersistDirectory;
this.segmentGranularity = segmentGranularity;
this.windowPeriod = windowPeriod == null ? new Period("PT10M") : windowPeriod;
this.basePersistDirectory = basePersistDirectory == null ? Files.createTempDir() : basePersistDirectory;
this.segmentGranularity = segmentGranularity == null ? Granularity.HOUR : segmentGranularity;
this.versioningPolicy = new IntervalStartVersioningPolicy();
this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory();
// Workaround for Jackson issue where if maxPendingPersists is null, all JacksonInjects fail
this.maxPendingPersists = RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS;
Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
Preconditions.checkNotNull(this.windowPeriod, "RealtimePlumberSchool requires a windowPeriod.");
Preconditions.checkNotNull(this.basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory.");
Preconditions.checkNotNull(this.segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity.");
}
@JsonProperty
public Period getWindowPeriod()
{
return windowPeriod;
}
@JsonProperty
public File getBasePersistDirectory()
{
return basePersistDirectory;
}
public VersioningPolicy getVersioningPolicy()
{
return versioningPolicy;
}
public RejectionPolicyFactory getRejectionPolicyFactory()
{
return rejectionPolicyFactory;
}
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
@JsonProperty("versioningPolicy")
@ -153,12 +181,13 @@ public class RealtimePlumberSchool implements PlumberSchool
this.queryExecutorService = executorService;
}
public void setDefaultMaxPendingPersists(int maxPendingPersists)
public void setMaxPendingPersists(int maxPendingPersists)
{
this.maxPendingPersists = maxPendingPersists;
}
@Override
@JsonProperty
public Granularity getSegmentGranularity()
{
return segmentGranularity;
@ -177,22 +206,16 @@ public class RealtimePlumberSchool implements PlumberSchool
log.info("Creating plumber using rejectionPolicy[%s]", rejectionPolicy);
return new RealtimePlumber(
windowPeriod,
basePersistDirectory,
segmentGranularity,
schema,
config,
metrics,
rejectionPolicy,
emitter,
conglomerate,
segmentAnnouncer,
queryExecutorService,
versioningPolicy,
dataSegmentPusher,
segmentPublisher,
serverView,
maxPendingPersists
serverView
);
}

View File

@ -0,0 +1,91 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import junit.framework.Assert;
import org.junit.Test;
import java.util.Arrays;
/**
*/
public class FireDepartmentTest
{
@Test
public void testSerde() throws Exception
{
ObjectMapper jsonMapper = new DefaultObjectMapper();
FireDepartment schema = new FireDepartment(
new DataSchema(
"foo",
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec(
"timestamp",
"auto"
),
new DimensionsSpec(
Arrays.asList("dim1", "dim2"),
null,
null
)
),
null, null, null, null
),
new AggregatorFactory[]{
new CountAggregatorFactory("count")
},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.MINUTE, null, Granularity.HOUR)
),
new RealtimeIOConfig(
null,
new RealtimePlumberSchool(
null, null, null
)
),
new RealtimeDriverConfig(
null, null, null, null, null, null, null, null
),
null, null, null, null
);
String json = jsonMapper.writeValueAsString(schema);
FireDepartment newSchema = jsonMapper.readValue(json, FireDepartment.class);
Assert.assertEquals(schema.getDataSchema().getDataSource(), newSchema.getDataSchema().getDataSource());
}
}

View File

@ -48,6 +48,7 @@ import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
@ -107,7 +108,16 @@ public class RealtimeManagerTest
}
}
);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(1, new Period("P1Y"), null);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(
1,
new Period("P1Y"),
null,
null,
null,
null,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, driverConfig, new DateTime().toString()));
realtimeManager = new RealtimeManager(

View File

@ -135,14 +135,22 @@ public class RealtimePlumberSchoolTest
realtimePlumberSchool.setConglomerate(new DefaultQueryRunnerFactoryConglomerate(Maps.<Class<? extends Query>, QueryRunnerFactory>newHashMap()));
realtimePlumberSchool.setSegmentAnnouncer(announcer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
realtimePlumberSchool.setRejectionPolicyFactory(new NoopRejectionPolicyFactory());
realtimePlumberSchool.setVersioningPolicy(new IntervalStartVersioningPolicy());
realtimePlumberSchool.setDataSegmentPusher(dataSegmentPusher);
realtimePlumberSchool.setServerView(serverView);
realtimePlumberSchool.setEmitter(emitter);
realtimePlumberSchool.setQueryExecutorService(MoreExecutors.sameThreadExecutor());
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(1, new Period("P1Y"), null);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(
1,
null,
null,
null,
new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(),
null,
null
);
plumber = realtimePlumberSchool.findPlumber(schema, driverConfig, new FireDepartmentMetrics());
}

View File

@ -30,7 +30,6 @@ import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeDriverConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireHydrant;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -55,41 +54,52 @@ public class SinkTest
final Interval interval = new Interval("2013-01-01/2013-01-02");
final String version = new DateTime().toString();
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(1, new Period("P1Y"), null);
RealtimeDriverConfig driverConfig = new RealtimeDriverConfig(
1,
new Period("P1Y"),
null,
null,
null,
null,
null,
null
);
final Sink sink = new Sink(interval, schema, driverConfig, version);
sink.add(new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
sink.add(
new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
});
@Override
public Object getRaw(String dimension)
{
return null;
}
}
);
FireHydrant currHydrant = sink.getCurrIndex();
Assert.assertEquals(new Interval("2013-01-01/PT1M"), currHydrant.getIndex().getInterval());
@ -97,38 +107,40 @@ public class SinkTest
FireHydrant swapHydrant = sink.swap();
sink.add(new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
sink.add(
new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public long getTimestampFromEpoch()
{
return new DateTime("2013-01-01").getMillis();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public List<String> getDimension(String dimension)
{
return Lists.newArrayList();
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public float getFloatMetric(String metric)
{
return 0;
}
@Override
public Object getRaw(String dimension)
{
return null;
}
});
@Override
public Object getRaw(String dimension)
{
return null;
}
}
);
Assert.assertEquals(currHydrant, swapHydrant);
Assert.assertNotSame(currHydrant, sink.getCurrIndex());