fix yet another bug with LQP

This commit is contained in:
fjy 2014-08-08 15:50:57 -07:00
parent 958792db5c
commit e5896f4863
1 changed files with 13 additions and 49 deletions

View File

@ -21,8 +21,6 @@ package io.druid.server.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
@ -45,7 +43,6 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -60,16 +57,7 @@ public class LoadQueuePeon
private static final int DROP = 0; private static final int DROP = 0;
private static final int LOAD = 1; private static final int LOAD = 1;
private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>() private static Comparator<DataSegment> segmentHolderComparator = Comparators.inverse(DataSegment.bucketMonthComparator());
{
private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator());
@Override
public int compare(SegmentHolder lhs, SegmentHolder rhs)
{
return comparator.compare(lhs.getSegment(), rhs.getSegment());
}
};
private final CuratorFramework curator; private final CuratorFramework curator;
private final String basePath; private final String basePath;
@ -81,10 +69,10 @@ public class LoadQueuePeon
private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0); private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListMap<SegmentHolder, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>( private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
segmentHolderComparator segmentHolderComparator
); );
private final ConcurrentSkipListMap<SegmentHolder, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>( private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
segmentHolderComparator segmentHolderComparator
); );
@ -112,37 +100,13 @@ public class LoadQueuePeon
@JsonProperty @JsonProperty
public Set<DataSegment> getSegmentsToLoad() public Set<DataSegment> getSegmentsToLoad()
{ {
return new ConcurrentSkipListSet<>( return segmentsToLoad.keySet();
Collections2.transform(
segmentsToLoad.keySet(),
new Function<SegmentHolder, DataSegment>()
{
@Override
public DataSegment apply(SegmentHolder input)
{
return input.getSegment();
}
}
)
);
} }
@JsonProperty @JsonProperty
public Set<DataSegment> getSegmentsToDrop() public Set<DataSegment> getSegmentsToDrop()
{ {
return new ConcurrentSkipListSet<>( return segmentsToDrop.keySet();
Collections2.transform(
segmentsToDrop.keySet(),
new Function<SegmentHolder, DataSegment>()
{
@Override
public DataSegment apply(SegmentHolder input)
{
return input.getSegment();
}
}
)
);
} }
public long getLoadQueueSize() public long getLoadQueueSize()
@ -173,7 +137,7 @@ public class LoadQueuePeon
final SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback)); final SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback));
synchronized (lock) { synchronized (lock) {
final SegmentHolder existingHolder = segmentsToLoad.get(holder); final SegmentHolder existingHolder = segmentsToLoad.get(segment);
if (existingHolder != null) { if (existingHolder != null) {
if ((callback != null)) { if ((callback != null)) {
existingHolder.addCallback(callback); existingHolder.addCallback(callback);
@ -184,7 +148,7 @@ public class LoadQueuePeon
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier());
queuedSize.addAndGet(segment.getSize()); queuedSize.addAndGet(segment.getSize());
segmentsToLoad.put(holder, holder); segmentsToLoad.put(segment, holder);
doNext(); doNext();
} }
@ -206,7 +170,7 @@ public class LoadQueuePeon
SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback)); SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback));
synchronized (lock) { synchronized (lock) {
final SegmentHolder existingHolder = segmentsToDrop.get(holder); final SegmentHolder existingHolder = segmentsToDrop.get(segment);
if (existingHolder != null) { if (existingHolder != null) {
if (callback != null) { if (callback != null) {
existingHolder.addCallback(callback); existingHolder.addCallback(callback);
@ -216,7 +180,7 @@ public class LoadQueuePeon
} }
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier());
segmentsToDrop.put(holder, holder); segmentsToDrop.put(segment, holder);
doNext(); doNext();
} }
@ -225,10 +189,10 @@ public class LoadQueuePeon
synchronized (lock) { synchronized (lock) {
if (currentlyProcessing == null) { if (currentlyProcessing == null) {
if (!segmentsToDrop.isEmpty()) { if (!segmentsToDrop.isEmpty()) {
currentlyProcessing = segmentsToDrop.firstKey(); currentlyProcessing = segmentsToDrop.firstEntry().getValue();
log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else if (!segmentsToLoad.isEmpty()) { } else if (!segmentsToLoad.isEmpty()) {
currentlyProcessing = segmentsToLoad.firstKey(); currentlyProcessing = segmentsToLoad.firstEntry().getValue();
log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
} else { } else {
return; return;
@ -364,14 +328,14 @@ public class LoadQueuePeon
} }
if (!segmentsToDrop.isEmpty()) { if (!segmentsToDrop.isEmpty()) {
for (SegmentHolder holder : segmentsToDrop.keySet()) { for (SegmentHolder holder : segmentsToDrop.values()) {
holder.executeCallbacks(); holder.executeCallbacks();
} }
} }
segmentsToDrop.clear(); segmentsToDrop.clear();
if (!segmentsToLoad.isEmpty()) { if (!segmentsToLoad.isEmpty()) {
for (SegmentHolder holder : segmentsToLoad.keySet()) { for (SegmentHolder holder : segmentsToLoad.values()) {
holder.executeCallbacks(); holder.executeCallbacks();
} }
} }