Handle null values in BrokerServerView.serverAddedSegment (#13980)

Due to race conditions, the BrokerServerView may sometimes try to add a segment to a server which has already been removed from the inventory. This results in an NPE and keeps the BrokerServerView from processing all change requests.
This commit is contained in:
Kashif Faraz 2023-03-30 16:19:05 +05:30 committed by GitHub
parent 61a35262ec
commit 47face9ca9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -71,9 +71,9 @@ public class BrokerServerView implements TimelineServerView
private final Object lock = new Object(); private final Object lock = new Object();
private final ConcurrentMap<String, QueryableDruidServer> clients; private final ConcurrentMap<String, QueryableDruidServer> clients = new ConcurrentHashMap<>();
private final Map<SegmentId, ServerSelector> selectors; private final Map<SegmentId, ServerSelector> selectors = new HashMap<>();
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines; private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines = new HashMap<>();
private final ConcurrentMap<TimelineCallback, Executor> timelineCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap<TimelineCallback, Executor> timelineCallbacks = new ConcurrentHashMap<>();
private final QueryToolChestWarehouse warehouse; private final QueryToolChestWarehouse warehouse;
@ -107,9 +107,6 @@ public class BrokerServerView implements TimelineServerView
this.baseView = baseView; this.baseView = baseView;
this.tierSelectorStrategy = tierSelectorStrategy; this.tierSelectorStrategy = tierSelectorStrategy;
this.emitter = emitter; this.emitter = emitter;
this.clients = new ConcurrentHashMap<>();
this.selectors = new HashMap<>();
this.timelines = new HashMap<>();
// Validate and set the segment watcher config // Validate and set the segment watcher config
validateSegmentWatcherConfig(segmentWatcherConfig); validateSegmentWatcherConfig(segmentWatcherConfig);
@ -183,10 +180,10 @@ public class BrokerServerView implements TimelineServerView
{ {
if (segmentWatcherConfig.isAwaitInitializationOnStart()) { if (segmentWatcherConfig.isAwaitInitializationOnStart()) {
final long startMillis = System.currentTimeMillis(); final long startMillis = System.currentTimeMillis();
log.info("%s waiting for initialization.", getClass().getSimpleName()); log.info("BrokerServerView waiting for initialization.");
awaitInitialization(); awaitInitialization();
final long endMillis = System.currentTimeMillis(); final long endMillis = System.currentTimeMillis();
log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), endMillis - startMillis); log.info("BrokerServerView initialized in [%,d] ms.", endMillis - startMillis);
emitter.emit(ServiceMetricEvent.builder().build( emitter.emit(ServiceMetricEvent.builder().build(
"init/serverview/time", "init/serverview/time",
endMillis - startMillis endMillis - startMillis
@ -267,7 +264,7 @@ public class BrokerServerView implements TimelineServerView
private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment) private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
{ {
SegmentId segmentId = segment.getId(); final SegmentId segmentId = segment.getId();
synchronized (lock) { synchronized (lock) {
// in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree // in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree
// query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query
@ -291,7 +288,17 @@ public class BrokerServerView implements TimelineServerView
QueryableDruidServer queryableDruidServer = clients.get(server.getName()); QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) { if (queryableDruidServer == null) {
queryableDruidServer = addServer(baseView.getInventoryValue(server.getName())); DruidServer inventoryValue = baseView.getInventoryValue(server.getName());
if (inventoryValue == null) {
log.warn(
"Could not find server[%s] in inventory. Skipping addition of segment[%s].",
server.getName(),
segmentId
);
return;
} else {
queryableDruidServer = addServer(inventoryValue);
}
} }
selector.addServerAndUpdateSegment(queryableDruidServer, segment); selector.addServerAndUpdateSegment(queryableDruidServer, segment);
} }
@ -302,8 +309,7 @@ public class BrokerServerView implements TimelineServerView
private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment) private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
{ {
final SegmentId segmentId = segment.getId();
SegmentId segmentId = segment.getId();
final ServerSelector selector; final ServerSelector selector;
synchronized (lock) { synchronized (lock) {
@ -323,7 +329,13 @@ public class BrokerServerView implements TimelineServerView
} }
QueryableDruidServer queryableDruidServer = clients.get(server.getName()); QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (!selector.removeServer(queryableDruidServer)) { if (queryableDruidServer == null) {
log.warn(
"Could not find server[%s] in inventory. Skipping removal of segment[%s].",
server.getName(),
segmentId
);
} else if (!selector.removeServer(queryableDruidServer)) {
log.warn( log.warn(
"Asked to disassociate non-existant association between server[%s] and segment[%s]", "Asked to disassociate non-existant association between server[%s] and segment[%s]",
server, server,
@ -378,7 +390,7 @@ public class BrokerServerView implements TimelineServerView
synchronized (lock) { synchronized (lock) {
QueryableDruidServer queryableDruidServer = clients.get(server.getName()); QueryableDruidServer queryableDruidServer = clients.get(server.getName());
if (queryableDruidServer == null) { if (queryableDruidServer == null) {
log.error("No QueryableDruidServer found for %s", server.getName()); log.error("No QueryRunner found for server name[%s].", server.getName());
return null; return null;
} }
return queryableDruidServer.getQueryRunner(); return queryableDruidServer.getQueryRunner();