1) Merge-n-push should only happen after messages have been seen such that it can happen.

This commit is contained in:
Eric Tschetter 2012-11-27 10:48:02 -06:00
parent 63c5b8c790
commit 790ba59244
1 changed files with 16 additions and 4 deletions

View File

@ -241,6 +241,7 @@ public class RealtimePlumberSchool implements PlumberSchool
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod);
log.info(
"Expect to run at [%s]",
@ -261,7 +262,7 @@ public class RealtimePlumberSchool implements PlumberSchool
{
log.info("Starting merge and push.");
long minTimestamp = segmentGranularity.truncate(new DateTime()).getMillis() - windowMillis;
long minTimestamp = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime()).getMillis() - windowMillis;
List<Map.Entry<Long, Sink>> sinksToPush = Lists.newArrayList();
for (Map.Entry<Long, Sink> entry : sinks.entrySet()) {
@ -340,8 +341,6 @@ public class RealtimePlumberSchool implements PlumberSchool
}
);
final RejectionPolicy rejectionPolicy = rejectionPolicyFactory.create(windowPeriod);
return new Plumber()
{
@Override
@ -537,6 +536,7 @@ public class RealtimePlumberSchool implements PlumberSchool
public interface RejectionPolicy
{
public DateTime getCurrMaxTime();
public boolean accept(long timestamp);
}
@ -559,6 +559,12 @@ public class RealtimePlumberSchool implements PlumberSchool
return new RejectionPolicy()
{
@Override
public DateTime getCurrMaxTime()
{
return new DateTime();
}
@Override
public boolean accept(long timestamp)
{
@ -577,7 +583,13 @@ public class RealtimePlumberSchool implements PlumberSchool
return new RejectionPolicy()
{
long maxTimestamp = Long.MIN_VALUE;
private volatile long maxTimestamp = Long.MIN_VALUE;
@Override
public DateTime getCurrMaxTime()
{
return new DateTime(maxTimestamp);
}
@Override
public boolean accept(long timestamp)