diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 05dad05bd96..a10dd285390 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -71,9 +71,9 @@ public class BrokerServerView implements TimelineServerView private final Object lock = new Object(); - private final ConcurrentMap clients; - private final Map selectors; - private final Map> timelines; + private final ConcurrentMap clients = new ConcurrentHashMap<>(); + private final Map selectors = new HashMap<>(); + private final Map> timelines = new HashMap<>(); private final ConcurrentMap timelineCallbacks = new ConcurrentHashMap<>(); private final QueryToolChestWarehouse warehouse; @@ -107,9 +107,6 @@ public class BrokerServerView implements TimelineServerView this.baseView = baseView; this.tierSelectorStrategy = tierSelectorStrategy; this.emitter = emitter; - this.clients = new ConcurrentHashMap<>(); - this.selectors = new HashMap<>(); - this.timelines = new HashMap<>(); // Validate and set the segment watcher config validateSegmentWatcherConfig(segmentWatcherConfig); @@ -183,10 +180,10 @@ public class BrokerServerView implements TimelineServerView { if (segmentWatcherConfig.isAwaitInitializationOnStart()) { final long startMillis = System.currentTimeMillis(); - log.info("%s waiting for initialization.", getClass().getSimpleName()); + log.info("BrokerServerView waiting for initialization."); awaitInitialization(); final long endMillis = System.currentTimeMillis(); - log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), endMillis - startMillis); + log.info("BrokerServerView initialized in [%,d] ms.", endMillis - startMillis); emitter.emit(ServiceMetricEvent.builder().build( "init/serverview/time", endMillis - startMillis @@ -267,7 +264,7 @@ public class BrokerServerView implements TimelineServerView private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) { - SegmentId segmentId = segment.getId(); + final SegmentId segmentId = segment.getId(); synchronized (lock) { // in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query @@ -291,7 +288,17 @@ public class BrokerServerView implements TimelineServerView QueryableDruidServer queryableDruidServer = clients.get(server.getName()); if (queryableDruidServer == null) { - queryableDruidServer = addServer(baseView.getInventoryValue(server.getName())); + DruidServer inventoryValue = baseView.getInventoryValue(server.getName()); + if (inventoryValue == null) { + log.warn( + "Could not find server[%s] in inventory. Skipping addition of segment[%s].", + server.getName(), + segmentId + ); + return; + } else { + queryableDruidServer = addServer(inventoryValue); + } } selector.addServerAndUpdateSegment(queryableDruidServer, segment); } @@ -302,8 +309,7 @@ public class BrokerServerView implements TimelineServerView private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) { - - SegmentId segmentId = segment.getId(); + final SegmentId segmentId = segment.getId(); final ServerSelector selector; synchronized (lock) { @@ -323,7 +329,13 @@ public class BrokerServerView implements TimelineServerView } QueryableDruidServer queryableDruidServer = clients.get(server.getName()); - if (!selector.removeServer(queryableDruidServer)) { + if (queryableDruidServer == null) { + log.warn( + "Could not find server[%s] in inventory. Skipping removal of segment[%s].", + server.getName(), + segmentId + ); + } else if (!selector.removeServer(queryableDruidServer)) { log.warn( "Asked to disassociate non-existant association between server[%s] and segment[%s]", server, @@ -378,7 +390,7 @@ public class BrokerServerView implements TimelineServerView synchronized (lock) { QueryableDruidServer queryableDruidServer = clients.get(server.getName()); if (queryableDruidServer == null) { - log.error("No QueryableDruidServer found for %s", server.getName()); + log.error("No QueryRunner found for server name[%s].", server.getName()); return null; } return queryableDruidServer.getQueryRunner();