diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java index 4fdeaa02dcf..c119b2c12e0 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java @@ -241,6 +241,7 @@ public class RealtimePlumberSchool implements PlumberSchool final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis(); final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod); log.info( "Expect to run at [%s]", @@ -261,7 +262,7 @@ public class RealtimePlumberSchool implements PlumberSchool { log.info("Starting merge and push."); - long minTimestamp = segmentGranularity.truncate(new DateTime()).getMillis() - windowMillis; + long minTimestamp = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime()).getMillis() - windowMillis; List> sinksToPush = Lists.newArrayList(); for (Map.Entry entry : sinks.entrySet()) { @@ -340,8 +341,6 @@ public class RealtimePlumberSchool implements PlumberSchool } ); - final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod); - return new Plumber() { @Override @@ -537,6 +536,7 @@ public class RealtimePlumberSchool implements PlumberSchool public interface RejectionPolicy { + public DateTime getCurrMaxTime(); public boolean accept(long timestamp); } @@ -559,6 +559,12 @@ public class RealtimePlumberSchool implements PlumberSchool return new RejectionPolicy() { + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(); + } + @Override public boolean accept(long timestamp) { @@ -577,7 +583,13 @@ public class RealtimePlumberSchool implements PlumberSchool return new RejectionPolicy() { - long maxTimestamp = Long.MIN_VALUE; + private volatile long maxTimestamp = Long.MIN_VALUE; + + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(maxTimestamp); + } @Override public boolean accept(long timestamp)