More controllable realtime shutdown

- Realtime plumber will start persisting segments shortly after finishJob is called, regardless
  of rejection policy

- Add "none" rejection policy
This commit is contained in:
Gian Merlino 2013-07-24 15:29:40 -07:00
parent 0eebb0a149
commit 5c96d500d8
3 changed files with 33 additions and 2 deletions

View File

@ -0,0 +1,26 @@
package com.metamx.druid.realtime.plumber;
import org.joda.time.DateTime;
import org.joda.time.Period;
public class NoopRejectionPolicyFactory implements RejectionPolicyFactory
{
@Override
public RejectionPolicy create(Period windowPeriod)
{
return new RejectionPolicy()
{
@Override
public DateTime getCurrMaxTime()
{
return new DateTime(0);
}
@Override
public boolean accept(long timestamp)
{
return true;
}
};
}
}

View File

@ -105,6 +105,8 @@ public class RealtimePlumberSchool implements PlumberSchool
private volatile SegmentPublisher segmentPublisher = null; private volatile SegmentPublisher segmentPublisher = null;
private volatile ServerView serverView = null; private volatile ServerView serverView = null;
private volatile boolean noMoreData = false;
@JsonCreator @JsonCreator
public RealtimePlumberSchool( public RealtimePlumberSchool(
@JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("windowPeriod") Period windowPeriod,
@ -324,6 +326,8 @@ public class RealtimePlumberSchool implements PlumberSchool
{ {
log.info("Shutting down..."); log.info("Shutting down...");
noMoreData = true;
while (!sinks.isEmpty()) { while (!sinks.isEmpty()) {
try { try {
log.info( log.info(
@ -553,7 +557,7 @@ public class RealtimePlumberSchool implements PlumberSchool
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList(); List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) { for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
final Long intervalStart = entry.getKey(); final Long intervalStart = entry.getKey();
if (intervalStart < minTimestamp) { if (noMoreData || intervalStart < minTimestamp) {
log.info("Adding entry[%s] for merge and push.", entry); log.info("Adding entry[%s] for merge and push.", entry);
sinksToPush.add(entry); sinksToPush.add(entry);
} }

View File

@ -7,7 +7,8 @@ import org.joda.time.Period;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class), @JsonSubTypes.Type(name = "serverTime", value = ServerTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class) @JsonSubTypes.Type(name = "messageTime", value = MessageTimeRejectionPolicyFactory.class),
@JsonSubTypes.Type(name = "none", value = NoopRejectionPolicyFactory.class)
}) })
public interface RejectionPolicyFactory public interface RejectionPolicyFactory
{ {