improve mem usage for getLoadStatus

This commit is contained in:
nishantmonu51 2014-06-06 00:23:34 +05:30
parent ab31300513
commit 5ef660f6d1
1 changed files with 14 additions and 39 deletions

View File

@ -77,6 +77,7 @@ import org.joda.time.Duration;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -91,11 +92,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class DruidCoordinator
{
public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR";
private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
private final Object lock = new Object();
private final DruidCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
private final JacksonConfigManager configManager;
@ -111,7 +109,6 @@ public class DruidCoordinator
private final AtomicReference<LeaderLatch> leaderLatch;
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
private volatile boolean started = false;
private volatile int leaderCounter = 0;
private volatile boolean leader = false;
@ -234,7 +231,6 @@ public class DruidCoordinator
return retVal;
}
public CountingMap<String> getSegmentAvailability()
{
final CountingMap<String> retVal = new CountingMap<>();
@ -253,43 +249,22 @@ public class DruidCoordinator
public Map<String, Double> getLoadStatus()
{
// find available segments
Map<String, Set<DataSegment>> availableSegments = Maps.newHashMap();
for (DataSegment dataSegment : getAvailableDataSegments()) {
Set<DataSegment> segments = availableSegments.get(dataSegment.getDataSource());
if (segments == null) {
segments = Sets.newHashSet();
availableSegments.put(dataSegment.getDataSource(), segments);
}
segments.add(dataSegment);
}
// find segments currently loaded
Map<String, Set<DataSegment>> segmentsInCluster = Maps.newHashMap();
for (DruidServer druidServer : serverInventoryView.getInventory()) {
for (DruidDataSource druidDataSource : druidServer.getDataSources()) {
Set<DataSegment> segments = segmentsInCluster.get(druidDataSource.getName());
if (segments == null) {
segments = Sets.newHashSet();
segmentsInCluster.put(druidDataSource.getName(), segments);
}
segments.addAll(druidDataSource.getSegments());
}
}
// compare available segments with currently loaded
Map<String, Double> loadStatus = Maps.newHashMap();
for (Map.Entry<String, Set<DataSegment>> entry : availableSegments.entrySet()) {
String dataSource = entry.getKey();
Set<DataSegment> segmentsAvailable = entry.getValue();
Set<DataSegment> loadedSegments = segmentsInCluster.get(dataSource);
if (loadedSegments == null) {
loadedSegments = Sets.newHashSet();
for (DruidDataSource dataSource : databaseSegmentManager.getInventory()) {
final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
final int availableSegmentSize = segments.size();
// remove loaded segments
for (DruidServer druidServer : serverInventoryView.getInventory()) {
final DruidDataSource loadedView = druidServer.getDataSource(dataSource.getName());
if (loadedView != null) {
segments.removeAll(loadedView.getSegments());
}
}
Set<DataSegment> unloadedSegments = Sets.difference(segmentsAvailable, loadedSegments);
final int unloadedSegmentSize = segments.size();
loadStatus.put(
dataSource,
100 * ((double) (segmentsAvailable.size() - unloadedSegments.size()) / (double) segmentsAvailable.size())
dataSource.getName(),
100 * ((double) (availableSegmentSize - unloadedSegmentSize) / (double) availableSegmentSize)
);
}