From e5896f486393e1940ba4d3c9218bce194ca1abc5 Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 8 Aug 2014 15:50:57 -0700 Subject: [PATCH] fix yet another bug with LQP --- .../server/coordinator/LoadQueuePeon.java | 62 ++++--------------- 1 file changed, 13 insertions(+), 49 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index 0a3337f0ff0..6341aaee2cc 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -21,8 +21,6 @@ package io.druid.server.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; @@ -45,7 +43,6 @@ import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -60,16 +57,7 @@ public class LoadQueuePeon private static final int DROP = 0; private static final int LOAD = 1; - private static Comparator segmentHolderComparator = new Comparator() - { - private Comparator comparator = Comparators.inverse(DataSegment.bucketMonthComparator()); - - @Override - public int compare(SegmentHolder lhs, SegmentHolder rhs) - { - return comparator.compare(lhs.getSegment(), rhs.getSegment()); - } - }; + private static Comparator segmentHolderComparator = Comparators.inverse(DataSegment.bucketMonthComparator()); private final CuratorFramework curator; private final String basePath; @@ -81,10 +69,10 @@ public class LoadQueuePeon private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicInteger failedAssignCount = new AtomicInteger(0); - private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( + private final ConcurrentSkipListMap segmentsToLoad = new ConcurrentSkipListMap<>( segmentHolderComparator ); - private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( + private final ConcurrentSkipListMap segmentsToDrop = new ConcurrentSkipListMap<>( segmentHolderComparator ); @@ -112,37 +100,13 @@ public class LoadQueuePeon @JsonProperty public Set getSegmentsToLoad() { - return new ConcurrentSkipListSet<>( - Collections2.transform( - segmentsToLoad.keySet(), - new Function() - { - @Override - public DataSegment apply(SegmentHolder input) - { - return input.getSegment(); - } - } - ) - ); + return segmentsToLoad.keySet(); } @JsonProperty public Set getSegmentsToDrop() { - return new ConcurrentSkipListSet<>( - Collections2.transform( - segmentsToDrop.keySet(), - new Function() - { - @Override - public DataSegment apply(SegmentHolder input) - { - return input.getSegment(); - } - } - ) - ); + return segmentsToDrop.keySet(); } public long getLoadQueueSize() @@ -173,7 +137,7 @@ public class LoadQueuePeon final SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback)); synchronized (lock) { - final SegmentHolder existingHolder = segmentsToLoad.get(holder); + final SegmentHolder existingHolder = segmentsToLoad.get(segment); if (existingHolder != null) { if ((callback != null)) { existingHolder.addCallback(callback); @@ -184,7 +148,7 @@ public class LoadQueuePeon log.info("Asking server peon[%s] to load segment[%s]", basePath, segment.getIdentifier()); queuedSize.addAndGet(segment.getSize()); - segmentsToLoad.put(holder, holder); + segmentsToLoad.put(segment, holder); doNext(); } @@ -206,7 +170,7 @@ public class LoadQueuePeon SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback)); synchronized (lock) { - final SegmentHolder existingHolder = segmentsToDrop.get(holder); + final SegmentHolder existingHolder = segmentsToDrop.get(segment); if (existingHolder != null) { if (callback != null) { existingHolder.addCallback(callback); @@ -216,7 +180,7 @@ public class LoadQueuePeon } log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment.getIdentifier()); - segmentsToDrop.put(holder, holder); + segmentsToDrop.put(segment, holder); doNext(); } @@ -225,10 +189,10 @@ public class LoadQueuePeon synchronized (lock) { if (currentlyProcessing == null) { if (!segmentsToDrop.isEmpty()) { - currentlyProcessing = segmentsToDrop.firstKey(); + currentlyProcessing = segmentsToDrop.firstEntry().getValue(); log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); } else if (!segmentsToLoad.isEmpty()) { - currentlyProcessing = segmentsToLoad.firstKey(); + currentlyProcessing = segmentsToLoad.firstEntry().getValue(); log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); } else { return; @@ -364,14 +328,14 @@ public class LoadQueuePeon } if (!segmentsToDrop.isEmpty()) { - for (SegmentHolder holder : segmentsToDrop.keySet()) { + for (SegmentHolder holder : segmentsToDrop.values()) { holder.executeCallbacks(); } } segmentsToDrop.clear(); if (!segmentsToLoad.isEmpty()) { - for (SegmentHolder holder : segmentsToLoad.keySet()) { + for (SegmentHolder holder : segmentsToLoad.values()) { holder.executeCallbacks(); } }