From c48c4ac0f6d18af15334bdbdee2e7b34bfabdd88 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 21 Aug 2014 09:44:55 +0530 Subject: [PATCH] handle arithmeticEx --- .../realtime/plumber/RealtimePlumber.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index bf1b575ff41..e3765505147 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -72,10 +72,8 @@ import java.util.concurrent.ScheduledExecutorService; public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); - private final DataSchema schema; private final RealtimeTuningConfig config; - private final RejectionPolicy rejectionPolicy; private final FireDepartmentMetrics metrics; private final ServiceEmitter emitter; @@ -610,10 +608,14 @@ public class RealtimePlumber implements Plumber final long windowMillis = windowPeriod.toStandardDuration().getMillis(); log.info("Starting merge and push."); - - DateTime minTimestampAsDate = segmentGranularity.truncate( - rejectionPolicy.getCurrMaxTime().minus(windowMillis) - ); + DateTime minTimestampAsDate; + try { + minTimestampAsDate = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime().minus(windowMillis)); + } + catch (ArithmeticException e) { + //caused when rejectionPolicy.currMaxTime minus windowMillis exceeds the capacity of long + minTimestampAsDate = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime()); + } long minTimestamp = minTimestampAsDate.getMillis(); log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate); @@ -785,13 +787,13 @@ public class RealtimePlumber implements Plumber && config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum() && Iterables.any( sinks.keySet(), new Predicate() - { - @Override - public boolean apply(Long sinkKey) - { - return segment.getInterval().contains(sinkKey); - } - } + { + @Override + public boolean apply(Long sinkKey) + { + return segment.getInterval().contains(sinkKey); + } + } ); } }