fix bug with comparing historical servers

This commit is contained in:
Fangjin Yang 2012-12-07 17:49:18 -08:00
parent da4b31c28f
commit 4695fbfcd9
2 changed files with 31 additions and 17 deletions

View File

@ -143,7 +143,7 @@ public abstract class InventoryManager<T>
return dataSources.get(key);
}
public Collection<T> getInventory()
public Iterable<T> getInventory()
{
return dataSources.values();
}

View File

@ -20,6 +20,7 @@
package com.metamx.druid.master;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -32,6 +33,7 @@ import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
@ -590,8 +592,22 @@ public class DruidMaster
@Override
public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params)
{
// Display info about all servers
Collection<DruidServer> servers = serverInventoryManager.getInventory();
// Display info about all historical servers
Iterable<DruidServer> servers =
FunctionalIterable.create(serverInventoryManager.getInventory())
.filter(
new Predicate<DruidServer>()
{
@Override
public boolean apply(
@Nullable DruidServer input
)
{
return input.getType()
.equalsIgnoreCase("historical");
}
}
);
if (log.isDebugEnabled()) {
log.debug("Servers");
for (DruidServer druidServer : servers) {
@ -606,21 +622,19 @@ public class DruidMaster
// Find all historical servers, group them by subType and sort by ascending usage
final DruidCluster cluster = new DruidCluster();
for (DruidServer server : servers) {
if (server.getType().equalsIgnoreCase("historical")) {
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = yp.combineParts(Arrays.asList(config.getLoadQueuePath(), server.getName()));
LoadQueuePeon loadQueuePeon = new LoadQueuePeon(yp, basePath, peonExec);
log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath);
if (!loadManagementPeons.containsKey(server.getName())) {
String basePath = yp.combineParts(Arrays.asList(config.getLoadQueuePath(), server.getName()));
LoadQueuePeon loadQueuePeon = new LoadQueuePeon(yp, basePath, peonExec);
log.info("Creating LoadQueuePeon for server[%s] at path[%s]", server.getName(), basePath);
loadManagementPeons.put(
server.getName(),
loadQueuePeon
);
yp.registerListener(basePath, loadQueuePeon);
}
cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
loadManagementPeons.put(
server.getName(),
loadQueuePeon
);
yp.registerListener(basePath, loadQueuePeon);
}
cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
}
SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
@ -628,7 +642,7 @@ public class DruidMaster
// Stop peons for servers that aren't there anymore.
for (String name : Sets.difference(
Sets.newHashSet(
Collections2.transform(
Iterables.transform(
servers,
new Function<DruidServer, String>()
{