From 02892233acd262547da37fd4d95c52c418df0da3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 15 Sep 2014 13:56:31 -0700 Subject: [PATCH 1/2] fix synchronized on field getting updated --- .../io/druid/segment/realtime/plumber/Sink.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 04e485793c1..8b4afa7904b 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -50,6 +50,7 @@ public class Sink implements Iterable private static final Logger log = new Logger(Sink.class); private volatile FireHydrant currHydrant; + private final transient Object hydrantLock = new Object(); private final Interval interval; private final DataSchema schema; @@ -117,7 +118,7 @@ public class Sink implements Iterable throw new IAE("No currHydrant but given row[%s]", row); } - synchronized (currHydrant) { + synchronized (hydrantLock) { IncrementalIndex index = currHydrant.getIndex(); if (index == null) { return -1; // the hydrant was swapped without being replaced @@ -128,7 +129,7 @@ public class Sink implements Iterable public boolean isEmpty() { - synchronized (currHydrant) { + synchronized (hydrantLock) { return hydrants.size() == 1 && currHydrant.getIndex().isEmpty(); } } @@ -145,7 +146,7 @@ public class Sink implements Iterable public boolean swappable() { - synchronized (currHydrant) { + synchronized (hydrantLock) { return currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0; } } @@ -185,17 +186,11 @@ public class Sink implements Iterable .build() ); - FireHydrant old; - if (currHydrant == null) { // Only happens on initialization, cannot synchronize on null + final FireHydrant old; + synchronized (hydrantLock) { old = currHydrant; currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); hydrants.add(currHydrant); - } else { - synchronized (currHydrant) { - old = currHydrant; - currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier()); - hydrants.add(currHydrant); - } } return old; From 762485a0c9655f7599720e67d3759d3fc9650637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 16 Sep 2014 09:59:25 -0700 Subject: [PATCH 2/2] remove unnecessary transient --- .../src/main/java/io/druid/segment/realtime/plumber/Sink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 8b4afa7904b..cb95217a23a 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -50,7 +50,7 @@ public class Sink implements Iterable private static final Logger log = new Logger(Sink.class); private volatile FireHydrant currHydrant; - private final transient Object hydrantLock = new Object(); + private final Object hydrantLock = new Object(); private final Interval interval; private final DataSchema schema;