Merge pull request #736 from metamx/fix-synchronized

fix synchronized on field getting updated
This commit is contained in:
fjy 2014-09-16 11:06:35 -06:00
commit 1dbb6fd4b0
1 changed files with 6 additions and 11 deletions

View File

@ -50,6 +50,7 @@ public class Sink implements Iterable<FireHydrant>
private static final Logger log = new Logger(Sink.class);
private volatile FireHydrant currHydrant;
private final Object hydrantLock = new Object();
private final Interval interval;
private final DataSchema schema;
@ -117,7 +118,7 @@ public class Sink implements Iterable<FireHydrant>
throw new IAE("No currHydrant but given row[%s]", row);
}
synchronized (currHydrant) {
synchronized (hydrantLock) {
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return -1; // the hydrant was swapped without being replaced
@ -128,7 +129,7 @@ public class Sink implements Iterable<FireHydrant>
public boolean isEmpty()
{
synchronized (currHydrant) {
synchronized (hydrantLock) {
return hydrants.size() == 1 && currHydrant.getIndex().isEmpty();
}
}
@ -145,7 +146,7 @@ public class Sink implements Iterable<FireHydrant>
public boolean swappable()
{
synchronized (currHydrant) {
synchronized (hydrantLock) {
return currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0;
}
}
@ -185,17 +186,11 @@ public class Sink implements Iterable<FireHydrant>
.build()
);
FireHydrant old;
if (currHydrant == null) { // Only happens on initialization, cannot synchronize on null
final FireHydrant old;
synchronized (hydrantLock) {
old = currHydrant;
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currHydrant);
} else {
synchronized (currHydrant) {
old = currHydrant;
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currHydrant);
}
}
return old;