more logging

This commit is contained in:
Fangjin Yang 2013-03-19 17:08:48 -07:00
parent 6324225a4f
commit c05629e6a4
1 changed files with 8 additions and 1 deletions

View File

@ -3,6 +3,7 @@ package com.metamx.druid.realtime;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.logger.Logger;
import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.index.v1.IndexGranularity;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory; import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory;
@ -21,6 +22,8 @@ import java.util.concurrent.ScheduledExecutorService;
*/ */
public class GracefulShutdownFirehose implements Firehose public class GracefulShutdownFirehose implements Firehose
{ {
private static final Logger log = new Logger(GracefulShutdownFirehose.class);
private final Firehose firehose; private final Firehose firehose;
private final IndexGranularity segmentGranularity; private final IndexGranularity segmentGranularity;
private final long windowMillis; private final long windowMillis;
@ -55,10 +58,14 @@ public class GracefulShutdownFirehose implements Firehose
public void shutdown() throws IOException public void shutdown() throws IOException
{ {
final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis(); final long truncatedNow = segmentGranularity.truncate(new DateTime()).getMillis();
final long end = segmentGranularity.increment(truncatedNow) + windowMillis;
final Duration timeUntilShutdown = new Duration(System.currentTimeMillis(), end);
log.info("Shutting down in %s", timeUntilShutdown);
ScheduledExecutors.scheduleWithFixedDelay( ScheduledExecutors.scheduleWithFixedDelay(
scheduledExecutor, scheduledExecutor,
new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis), timeUntilShutdown,
new Callable<ScheduledExecutors.Signal>() new Callable<ScheduledExecutors.Signal>()
{ {
@Override @Override