handle arithmeticEx

This commit is contained in:
nishantmonu51 2014-08-21 09:44:55 +05:30
parent 4e3f4fbc22
commit c48c4ac0f6
1 changed files with 15 additions and 13 deletions

View File

@ -72,10 +72,8 @@ import java.util.concurrent.ScheduledExecutorService;
public class RealtimePlumber implements Plumber
{
private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
private final DataSchema schema;
private final RealtimeTuningConfig config;
private final RejectionPolicy rejectionPolicy;
private final FireDepartmentMetrics metrics;
private final ServiceEmitter emitter;
@ -610,10 +608,14 @@ public class RealtimePlumber implements Plumber
final long windowMillis = windowPeriod.toStandardDuration().getMillis();
log.info("Starting merge and push.");
DateTime minTimestampAsDate = segmentGranularity.truncate(
rejectionPolicy.getCurrMaxTime().minus(windowMillis)
);
DateTime minTimestampAsDate;
try {
minTimestampAsDate = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime().minus(windowMillis));
}
catch (ArithmeticException e) {
//caused when rejectionPolicy.currMaxTime minus windowMillis exceeds the capacity of long
minTimestampAsDate = segmentGranularity.truncate(rejectionPolicy.getCurrMaxTime());
}
long minTimestamp = minTimestampAsDate.getMillis();
log.info("Found [%,d] sinks. minTimestamp [%s]", sinks.size(), minTimestampAsDate);
@ -785,13 +787,13 @@ public class RealtimePlumber implements Plumber
&& config.getShardSpec().getPartitionNum() == segment.getShardSpec().getPartitionNum()
&& Iterables.any(
sinks.keySet(), new Predicate<Long>()
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
{
@Override
public boolean apply(Long sinkKey)
{
return segment.getInterval().contains(sinkKey);
}
}
);
}
}