fix another bug in LQP with concurrency

This commit is contained in:
fjy 2014-08-08 15:06:03 -07:00
parent 35937132a7
commit 68827ff88b
1 changed files with 21 additions and 18 deletions

View File

@ -44,6 +44,7 @@ import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -80,10 +81,10 @@ public class LoadQueuePeon
private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0); private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>( private final ConcurrentSkipListMap<SegmentHolder, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
segmentHolderComparator segmentHolderComparator
); );
private final ConcurrentSkipListSet<SegmentHolder> segmentsToDrop = new ConcurrentSkipListSet<SegmentHolder>( private final ConcurrentSkipListMap<SegmentHolder, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
segmentHolderComparator segmentHolderComparator
); );
@ -111,9 +112,9 @@ public class LoadQueuePeon
@JsonProperty @JsonProperty
public Set<DataSegment> getSegmentsToLoad() public Set<DataSegment> getSegmentsToLoad()
{ {
return new ConcurrentSkipListSet<DataSegment>( return new ConcurrentSkipListSet<>(
Collections2.transform( Collections2.transform(
segmentsToLoad, segmentsToLoad.keySet(),
new Function<SegmentHolder, DataSegment>() new Function<SegmentHolder, DataSegment>()
{ {
@Override @Override
@ -129,9 +130,9 @@ public class LoadQueuePeon
@JsonProperty @JsonProperty
public Set<DataSegment> getSegmentsToDrop() public Set<DataSegment> getSegmentsToDrop()
{ {
return new ConcurrentSkipListSet<DataSegment>( return new ConcurrentSkipListSet<>(
Collections2.transform( Collections2.transform(
segmentsToDrop, segmentsToDrop.keySet(),
new Function<SegmentHolder, DataSegment>() new Function<SegmentHolder, DataSegment>()
{ {
@Override @Override
@ -169,12 +170,13 @@ public class LoadQueuePeon
} }
} }
SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback)); final SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback));
synchronized (lock) { synchronized (lock) {
if (segmentsToLoad.contains(holder)) { final SegmentHolder existingHolder = segmentsToLoad.get(holder);
if (existingHolder != null) {
if ((callback != null)) { if ((callback != null)) {
holder.addCallback(callback); existingHolder.addCallback(callback);
} }
return; return;
} }
@ -182,7 +184,7 @@ public class LoadQueuePeon
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize()); queuedSize.addAndGet(segment.getSize());
segmentsToLoad.add(holder); segmentsToLoad.put(holder, holder);
doNext(); doNext();
} }
@ -204,16 +206,17 @@ public class LoadQueuePeon
SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback)); SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback));
synchronized (lock) { synchronized (lock) {
if (segmentsToDrop.contains(holder)) { final SegmentHolder existingHolder = segmentsToDrop.get(holder);
if (existingHolder != null) {
if (callback != null) { if (callback != null) {
holder.addCallback(callback); existingHolder.addCallback(callback);
} }
return; return;
} }
} }
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.add(holder); segmentsToDrop.put(holder, holder);
doNext(); doNext();
} }
@ -222,10 +225,10 @@ public class LoadQueuePeon
synchronized (lock) { synchronized (lock) {
if (currentlyProcessing == null) { if (currentlyProcessing == null) {
if (!segmentsToDrop.isEmpty()) { if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.first(); currentlyProcessing = segmentsToDrop.firstKey();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) { } else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.first(); currentlyProcessing = segmentsToLoad.firstKey();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else { } else {
return; return;
@ -337,7 +340,7 @@ public class LoadQueuePeon
default: default:
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
callBackExecutor.execute( callBackExecutor.execute(
new Runnable() new Runnable()
{ {
@ -361,14 +364,14 @@ public class LoadQueuePeon
} }
if (!segmentsToDrop.isEmpty()) { if (!segmentsToDrop.isEmpty()) {
for (SegmentHolder holder : segmentsToDrop) { for (SegmentHolder holder : segmentsToDrop.keySet()) {
holder.executeCallbacks(); holder.executeCallbacks();
} }
} }
segmentsToDrop.clear(); segmentsToDrop.clear();
if (!segmentsToLoad.isEmpty()) { if (!segmentsToLoad.isEmpty()) {
for (SegmentHolder holder : segmentsToLoad) { for (SegmentHolder holder : segmentsToLoad.keySet()) {
holder.executeCallbacks(); holder.executeCallbacks();
} }
} }