diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 04e485793c1..cb95217a23a 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -50,6 +50,7 @@ public class Sink implements Iterable private static final Logger log = new Logger(Sink.class); private volatile FireHydrant currHydrant; + private final Object hydrantLock = new Object(); private final Interval interval; private final DataSchema schema; @@ -117,7 +118,7 @@ public class Sink implements Iterable throw new IAE("No currHydrant but given row[%s]", row); } - synchronized (currHydrant) { + synchronized (hydrantLock) { IncrementalIndex index = currHydrant.getIndex(); if (index == null) { return -1; // the hydrant was swapped without being replaced @@ -128,7 +129,7 @@ public class Sink implements Iterable public boolean isEmpty() { - synchronized (currHydrant) { + synchronized (hydrantLock) { return hydrants.size() == 1 && currHydrant.getIndex().isEmpty(); } } @@ -145,7 +146,7 @@ public class Sink implements Iterable public boolean swappable() { - synchronized (currHydrant) { + synchronized (hydrantLock) { return currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0; } } @@ -185,17 +186,11 @@ public class Sink implements Iterable .build() ); - FireHydrant old; - if (currHydrant == null) { // Only happens on initialization, cannot synchronize on null + final FireHydrant old; + synchronized (hydrantLock) { old = currHydrant; currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); hydrants.add(currHydrant); - } else { - synchronized (currHydrant) { - old = currHydrant; - currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); - hydrants.add(currHydrant); - } } return old;