From 790ba592444d7aa35be6bf6de302b115d8828c5f Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Tue, 27 Nov 2012 10:48:02 -0600 Subject: [PATCH] 1) Merge-n-push should only happen after messages have been seen such that it can happen. --- .../druid/realtime/RealtimePlumberSchool.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java index 4fdeaa02dcf..c119b2c12e0 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java @@ -241,6 +241,7 @@ public class RealtimePlumberSchool implements PlumberSchool final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis(); final long windowMillis = windowPeriod.toStandardDuration().getMillis(); + final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod); log.info( "Expect to run at [%s]", @@ -261,7 +262,7 @@ public class RealtimePlumberSchool implements PlumberSchool { log.info("Starting merge and push."); - long minTimestamp = segmentGranularity.truncate(new DateTime()).getMillis() - windowMillis; + long minTimestamp = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime()).getMillis() - windowMillis; List> sinksToPush = Lists.newArrayList(); for (Map.Entry entry : sinks.entrySet()) { @@ -340,8 +341,6 @@ public class RealtimePlumberSchool implements PlumberSchool } ); - final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod); - return new Plumber() { @Override @@ -537,6 +536,7 @@ public class RealtimePlumberSchool implements PlumberSchool public interface RejectionPolicy { + public DateTime getCurrMaxTime(); public boolean accept(long timestamp); } @@ -559,6 +559,12 @@ public class RealtimePlumberSchool implements PlumberSchool return new RejectionPolicy() { + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(); + } + @Override public boolean accept(long timestamp) { @@ -577,7 +583,13 @@ public class RealtimePlumberSchool implements PlumberSchool return new RejectionPolicy() { - long maxTimestamp = Long.MIN_VALUE; + private volatile long maxTimestamp = Long.MIN_VALUE; + + @Override + public DateTime getCurrMaxTime() + { + return new DateTime(maxTimestamp); + } @Override public boolean accept(long timestamp)