fix synchronized on field getting updated

This commit is contained in:
Xavier Léauté 2014-09-15 13:56:31 -07:00
parent 4b4de83f21
commit 02892233ac
1 changed files with 6 additions and 11 deletions

View File

@ -50,6 +50,7 @@ public class Sink implements Iterable<FireHydrant>
private static final Logger log = new Logger(Sink.class);
private volatile FireHydrant currHydrant;
private final transient Object hydrantLock = new Object();
private final Interval interval;
private final DataSchema schema;
@ -117,7 +118,7 @@ public class Sink implements Iterable<FireHydrant>
throw new IAE("No currHydrant but given row[%s]", row);
}
synchronized (currHydrant) {
synchronized (hydrantLock) {
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return -1; // the hydrant was swapped without being replaced
@ -128,7 +129,7 @@ public class Sink implements Iterable<FireHydrant>
public boolean isEmpty()
{
synchronized (currHydrant) {
synchronized (hydrantLock) {
return hydrants.size() == 1 && currHydrant.getIndex().isEmpty();
}
}
@ -145,7 +146,7 @@ public class Sink implements Iterable<FireHydrant>
public boolean swappable()
{
synchronized (currHydrant) {
synchronized (hydrantLock) {
return currHydrant.getIndex() != null && currHydrant.getIndex().size() != 0;
}
}
@ -185,17 +186,11 @@ public class Sink implements Iterable<FireHydrant>
.build()
);
FireHydrant old;
if (currHydrant == null) { // Only happens on initialization, cannot synchronize on null
final FireHydrant old;
synchronized (hydrantLock) {
old = currHydrant;
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currHydrant);
} else {
synchronized (currHydrant) {
old = currHydrant;
currHydrant = new FireHydrant(newIndex, hydrants.size(), getSegment().getIdentifier());
hydrants.add(currHydrant);
}
}
return old;