concurrency my old foe, we meet again

This commit is contained in:
fjy 2014-08-09 19:33:07 -07:00
parent d58ae945a4
commit 0e1a8f6f02
1 changed files with 26 additions and 25 deletions

View File

@ -57,7 +57,16 @@ public class LoadQueuePeon
private static final int DROP = 0;
private static final int LOAD = 1;
private static Comparator<DataSegment> segmentHolderComparator = Comparators.inverse(DataSegment.bucketMonthComparator());
private static Comparator<DataSegment> segmentComparator = Comparators.inverse(DataSegment.bucketMonthComparator());
private static void executeCallbacks(List<LoadPeonCallback> callbacks)
{
for (LoadPeonCallback callback : callbacks) {
if (callback != null) {
callback.execute();
}
}
}
private final CuratorFramework curator;
private final String basePath;
@ -70,10 +79,10 @@ public class LoadQueuePeon
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
segmentHolderComparator
segmentComparator
);
private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
segmentHolderComparator
segmentComparator
);
private final Object lock = new Object();
@ -120,8 +129,8 @@ public class LoadQueuePeon
}
public void loadSegment(
DataSegment segment,
LoadPeonCallback callback
final DataSegment segment,
final LoadPeonCallback callback
)
{
synchronized (lock) {
@ -134,8 +143,6 @@ public class LoadQueuePeon
}
}
final SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback));
synchronized (lock) {
final SegmentHolder existingHolder = segmentsToLoad.get(segment);
if (existingHolder != null) {
@ -148,13 +155,13 @@ public class LoadQueuePeon
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.put(segment, holder);
segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Arrays.asList(callback)));
doNext();
}
public void dropSegment(
DataSegment segment,
LoadPeonCallback callback
final DataSegment segment,
final LoadPeonCallback callback
)
{
synchronized (lock) {
@ -167,8 +174,6 @@ public class LoadQueuePeon
}
}
SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback));
synchronized (lock) {
final SegmentHolder existingHolder = segmentsToDrop.get(segment);
if (existingHolder != null) {
@ -180,7 +185,7 @@ public class LoadQueuePeon
}
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.put(segment, holder);
segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Arrays.asList(callback)));
doNext();
}
@ -305,14 +310,15 @@ public class LoadQueuePeon
throw new UnsupportedOperationException();
}
final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
currentlyProcessing = null;
callBackExecutor.execute(
new Runnable()
{
@Override
public void run()
{
currentlyProcessing.executeCallbacks();
currentlyProcessing = null;
executeCallbacks(callbacks);
}
}
);
@ -323,20 +329,20 @@ public class LoadQueuePeon
{
synchronized (lock) {
if (currentlyProcessing != null) {
currentlyProcessing.executeCallbacks();
executeCallbacks(currentlyProcessing.getCallbacks());
currentlyProcessing = null;
}
if (!segmentsToDrop.isEmpty()) {
for (SegmentHolder holder : segmentsToDrop.values()) {
holder.executeCallbacks();
executeCallbacks(holder.getCallbacks());
}
}
segmentsToDrop.clear();
if (!segmentsToLoad.isEmpty()) {
for (SegmentHolder holder : segmentsToLoad.values()) {
holder.executeCallbacks();
executeCallbacks(holder.getCallbacks());
}
}
segmentsToLoad.clear();
@ -433,15 +439,10 @@ public class LoadQueuePeon
}
}
public void executeCallbacks()
public List<LoadPeonCallback> getCallbacks()
{
synchronized (callbacks) {
for (LoadPeonCallback callback : callbacks) {
if (callback != null) {
callback.execute();
}
}
callbacks.clear();
return callbacks;
}
}