mirror of https://github.com/apache/druid.git
fix shutdown problem
This commit is contained in:
parent
dfe91d3bf9
commit
9cdfea31b2
|
@ -8,6 +8,7 @@ import com.metamx.druid.index.v1.IndexGranularity;
|
||||||
import com.metamx.druid.input.InputRow;
|
import com.metamx.druid.input.InputRow;
|
||||||
import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory;
|
import com.metamx.druid.realtime.plumber.IntervalRejectionPolicyFactory;
|
||||||
import com.metamx.druid.realtime.plumber.RejectionPolicy;
|
import com.metamx.druid.realtime.plumber.RejectionPolicy;
|
||||||
|
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.Duration;
|
import org.joda.time.Duration;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
@ -30,6 +31,7 @@ public class GracefulShutdownFirehose implements Firehose
|
||||||
private final ScheduledExecutorService scheduledExecutor;
|
private final ScheduledExecutorService scheduledExecutor;
|
||||||
private final RejectionPolicy rejectionPolicy;
|
private final RejectionPolicy rejectionPolicy;
|
||||||
|
|
||||||
|
private final MutableBoolean hasMore = new MutableBoolean(true);
|
||||||
private volatile boolean shutdown = false;
|
private volatile boolean shutdown = false;
|
||||||
|
|
||||||
public GracefulShutdownFirehose(
|
public GracefulShutdownFirehose(
|
||||||
|
@ -72,7 +74,7 @@ public class GracefulShutdownFirehose implements Firehose
|
||||||
public ScheduledExecutors.Signal call() throws Exception
|
public ScheduledExecutors.Signal call() throws Exception
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
firehose.close();
|
hasMore.setValue(false);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
throw Throwables.propagate(e);
|
throw Throwables.propagate(e);
|
||||||
|
@ -89,7 +91,7 @@ public class GracefulShutdownFirehose implements Firehose
|
||||||
@Override
|
@Override
|
||||||
public boolean hasMore()
|
public boolean hasMore()
|
||||||
{
|
{
|
||||||
return firehose.hasMore();
|
return hasMore.booleanValue() && firehose.hasMore();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue