mirror of
https://github.com/apache/druid.git
synced 2025-02-09 03:24:55 +00:00
revert lambda conversion to fix occasional jvm error (#5591)
This commit is contained in:
parent
5ab17668c0
commit
ea4f8544fb
@ -691,58 +691,63 @@ public class DruidCoordinator
|
|||||||
super(
|
super(
|
||||||
ImmutableList.of(
|
ImmutableList.of(
|
||||||
new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
|
new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
|
||||||
params -> {
|
new DruidCoordinatorHelper()
|
||||||
// Display info about all historical servers
|
{
|
||||||
Iterable<ImmutableDruidServer> servers = FunctionalIterable
|
@Override
|
||||||
.create(serverInventoryView.getInventory())
|
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
|
||||||
.filter(DruidServer::segmentReplicatable)
|
{
|
||||||
.transform(DruidServer::toImmutableDruidServer);
|
// Display info about all historical servers
|
||||||
|
Iterable<ImmutableDruidServer> servers = FunctionalIterable
|
||||||
|
.create(serverInventoryView.getInventory())
|
||||||
|
.filter(DruidServer::segmentReplicatable)
|
||||||
|
.transform(DruidServer::toImmutableDruidServer);
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Servers");
|
log.debug("Servers");
|
||||||
for (ImmutableDruidServer druidServer : servers) {
|
for (ImmutableDruidServer druidServer : servers) {
|
||||||
log.debug(" %s", druidServer);
|
log.debug(" %s", druidServer);
|
||||||
log.debug(" -- DataSources");
|
log.debug(" -- DataSources");
|
||||||
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
|
for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) {
|
||||||
log.debug(" %s", druidDataSource);
|
log.debug(" %s", druidDataSource);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Find all historical servers, group them by subType and sort by ascending usage
|
// Find all historical servers, group them by subType and sort by ascending usage
|
||||||
final DruidCluster cluster = new DruidCluster();
|
final DruidCluster cluster = new DruidCluster();
|
||||||
for (ImmutableDruidServer server : servers) {
|
for (ImmutableDruidServer server : servers) {
|
||||||
if (!loadManagementPeons.containsKey(server.getName())) {
|
if (!loadManagementPeons.containsKey(server.getName())) {
|
||||||
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server);
|
LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server);
|
||||||
loadQueuePeon.start();
|
loadQueuePeon.start();
|
||||||
log.info("Created LoadQueuePeon for server[%s].", server.getName());
|
log.info("Created LoadQueuePeon for server[%s].", server.getName());
|
||||||
|
|
||||||
loadManagementPeons.put(server.getName(), loadQueuePeon);
|
loadManagementPeons.put(server.getName(), loadQueuePeon);
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster.add(new ServerHolder(server, loadManagementPeons.get(server.getName())));
|
segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
|
||||||
}
|
|
||||||
|
|
||||||
segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
|
// Stop peons for servers that aren't there anymore.
|
||||||
|
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
|
||||||
|
for (ImmutableDruidServer server : servers) {
|
||||||
|
disappeared.remove(server.getName());
|
||||||
|
}
|
||||||
|
for (String name : disappeared) {
|
||||||
|
log.info("Removing listener for server[%s] which is no longer there.", name);
|
||||||
|
LoadQueuePeon peon = loadManagementPeons.remove(name);
|
||||||
|
peon.stop();
|
||||||
|
}
|
||||||
|
|
||||||
// Stop peons for servers that aren't there anymore.
|
return params.buildFromExisting()
|
||||||
final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet());
|
.withDruidCluster(cluster)
|
||||||
for (ImmutableDruidServer server : servers) {
|
.withDatabaseRuleManager(metadataRuleManager)
|
||||||
disappeared.remove(server.getName());
|
.withLoadManagementPeons(loadManagementPeons)
|
||||||
|
.withSegmentReplicantLookup(segmentReplicantLookup)
|
||||||
|
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
for (String name : disappeared) {
|
|
||||||
log.info("Removing listener for server[%s] which is no longer there.", name);
|
|
||||||
LoadQueuePeon peon = loadManagementPeons.remove(name);
|
|
||||||
peon.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
return params.buildFromExisting()
|
|
||||||
.withDruidCluster(cluster)
|
|
||||||
.withDatabaseRuleManager(metadataRuleManager)
|
|
||||||
.withLoadManagementPeons(loadManagementPeons)
|
|
||||||
.withSegmentReplicantLookup(segmentReplicantLookup)
|
|
||||||
.withBalancerReferenceTimestamp(DateTimes.nowUtc())
|
|
||||||
.build();
|
|
||||||
},
|
},
|
||||||
new DruidCoordinatorRuleRunner(DruidCoordinator.this),
|
new DruidCoordinatorRuleRunner(DruidCoordinator.this),
|
||||||
new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
|
new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user