RealtimeIndexTask: Add minTime to assist in graceful switchover

This commit is contained in:
Gian Merlino 2013-04-19 14:07:17 -07:00
parent 1a6594524f
commit 17205ebea4
3 changed files with 75 additions and 8 deletions

View File

@ -40,8 +40,10 @@ import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.GracefulShutdownFirehose;
import com.metamx.druid.realtime.MinTimeFirehose;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.Schema;
@ -62,19 +64,22 @@ public class RealtimeIndexTask extends AbstractTask
private static final EmittingLogger log = new EmittingLogger(RealtimeIndexTask.class);
@JsonIgnore
final Schema schema;
private final Schema schema;
@JsonIgnore
final FirehoseFactory firehoseFactory;
private final FirehoseFactory firehoseFactory;
@JsonIgnore
final FireDepartmentConfig fireDepartmentConfig;
private final FireDepartmentConfig fireDepartmentConfig;
@JsonIgnore
final Period windowPeriod;
private final Period windowPeriod;
@JsonIgnore
final IndexGranularity segmentGranularity;
private final IndexGranularity segmentGranularity;
@JsonIgnore
private final DateTime minTime;
@JsonIgnore
private volatile Plumber plumber = null;
@ -95,7 +100,8 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, // TODO rename?
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity,
@JsonProperty("minTime") DateTime minTime
)
{
super(
@ -116,6 +122,7 @@ public class RealtimeIndexTask extends AbstractTask
this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod;
this.segmentGranularity = segmentGranularity;
this.minTime = minTime;
}
@Override
@ -156,7 +163,19 @@ public class RealtimeIndexTask extends AbstractTask
if (shutdown) {
return TaskStatus.success(getId());
}
firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
Firehose wrappedFirehose = firehoseFactory.connect();
if (minTime != null) {
log.info("Wrapping firehose in MinTimeFirehose with minTime[%s]", minTime);
wrappedFirehose = new MinTimeFirehose(wrappedFirehose, minTime);
}
log.info(
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
segmentGranularity,
windowPeriod
);
firehose = new GracefulShutdownFirehose(wrappedFirehose, segmentGranularity, windowPeriod);
}
// TODO -- Take PlumberSchool in constructor (although that will need jackson injectables for stuff like

View File

@ -205,7 +205,8 @@ public class TaskSerdeTest
null,
null,
new Period("PT10M"),
IndexGranularity.HOUR
IndexGranularity.HOUR,
null
);
final ObjectMapper jsonMapper = new DefaultObjectMapper();

View File

@ -0,0 +1,47 @@
package com.metamx.druid.realtime;
import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
import java.io.IOException;
public class MinTimeFirehose implements Firehose
{
private final Firehose firehose;
private final DateTime minTime;
public MinTimeFirehose(Firehose firehose, DateTime minTime)
{
this.firehose = firehose;
this.minTime = minTime;
}
@Override
public boolean hasMore()
{
return firehose.hasMore();
}
@Override
public InputRow nextRow()
{
while (true) {
final InputRow row = firehose.nextRow();
if (row.getTimestampFromEpoch() >= minTime.getMillis()) {
return row;
}
}
}
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override
public void close() throws IOException
{
firehose.close();
}
}