From faa74ce2cbad8876e25ef151c7f5188a89f77407 Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 1 May 2014 01:38:03 +0530 Subject: [PATCH] hasSwapped properly check protect check for hasSwapped. --- .../segment/realtime/plumber/RealtimePlumber.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index b5a93e68942..2b7e3625896 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -313,10 +313,12 @@ public class RealtimePlumber implements Plumber final Interval interval = sink.getInterval(); for (FireHydrant hydrant : sink) { - if (!hydrant.hasSwapped()) { - log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval); - metrics.incrementRowOutputCount(rowCount); + synchronized (hydrant) { + if (!hydrant.hasSwapped()) { + log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); + final int rowCount = persistHydrant(hydrant, schema, interval); + metrics.incrementRowOutputCount(rowCount); + } } }