hasSwapped properly check

protect check for hasSwapped.
This commit is contained in:
nishantmonu51 2014-05-01 01:38:03 +05:30
parent ff56573910
commit faa74ce2cb
1 changed files with 6 additions and 4 deletions

View File

@ -313,12 +313,14 @@ public class RealtimePlumber implements Plumber
final Interval interval = sink.getInterval(); final Interval interval = sink.getInterval();
for (FireHydrant hydrant : sink) { for (FireHydrant hydrant : sink) {
synchronized (hydrant) {
if (!hydrant.hasSwapped()) { if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval); final int rowCount = persistHydrant(hydrant, schema, interval);
metrics.incrementRowOutputCount(rowCount); metrics.incrementRowOutputCount(rowCount);
} }
} }
}
final File mergedTarget = new File(computePersistDir(schema, interval), "merged"); final File mergedTarget = new File(computePersistDir(schema, interval), "merged");
if (mergedTarget.exists()) { if (mergedTarget.exists()) {