From dba7c7d3cd9fe707ecefa45498e1bcba32334b55 Mon Sep 17 00:00:00 2001 From: Niketh Sabbineni Date: Tue, 15 Aug 2017 10:14:20 -0700 Subject: [PATCH] Reduce excessive logging (#4680) * Reduce excessive logging * Refactoring code as per comments --- .../server/coordinator/LoadQueuePeon.java | 176 +++++++++--------- 1 file changed, 89 insertions(+), 87 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java index f689fe6e274..846069fb334 100644 --- a/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/LoadQueuePeon.java @@ -193,99 +193,101 @@ public class LoadQueuePeon private void processSegmentChangeRequest() { - if (currentlyProcessing == null) { - if (!segmentsToDrop.isEmpty()) { - currentlyProcessing = segmentsToDrop.firstEntry().getValue(); - log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - } else if (!segmentsToLoad.isEmpty()) { - currentlyProcessing = segmentsToLoad.firstEntry().getValue(); - log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - } else { + if (currentlyProcessing != null) { + log.debug( + "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].", + basePath, + currentlyProcessing.getSegmentIdentifier() + ); + + return; + } + + if (!segmentsToDrop.isEmpty()) { + currentlyProcessing = segmentsToDrop.firstEntry().getValue(); + log.info("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + } else if (!segmentsToLoad.isEmpty()) { + currentlyProcessing = segmentsToLoad.firstEntry().getValue(); + log.info("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + } else { + return; + } + + try { + if (currentlyProcessing == null) { + if (!stopped) { + log.makeAlert("Crazy race condition! server[%s]", basePath) + .emit(); + } + actionCompleted(); return; } - try { - if (currentlyProcessing == null) { - if (!stopped) { - log.makeAlert("Crazy race condition! server[%s]", basePath) - .emit(); - } - actionCompleted(); - return; - } + log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier()); + final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier()); + final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); + curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - log.info("Server[%s] processing segment[%s]", basePath, currentlyProcessing.getSegmentIdentifier()); - final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentIdentifier()); - final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest()); - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - - processingExecutor.schedule( - new Runnable() - { - @Override - public void run() - { - try { - if (curator.checkExists().forPath(path) != null) { - failAssign(new ISE("%s was never removed! Failing this operation!", path)); - } - } - catch (Exception e) { - failAssign(e); - } - } - }, - config.getLoadTimeoutDelay().getMillis(), - TimeUnit.MILLISECONDS - ); - - final Stat stat = curator.checkExists().usingWatcher( - new CuratorWatcher() - { - @Override - public void process(WatchedEvent watchedEvent) throws Exception - { - switch (watchedEvent.getType()) { - case NodeDeleted: - entryRemoved(watchedEvent.getPath()); - break; - default: - // do nothing - } - } + processingExecutor.schedule( + new Runnable() + { + @Override + public void run() + { + try { + if (curator.checkExists().forPath(path) != null) { + failAssign(new ISE("%s was never removed! Failing this operation!", path)); } - ).forPath(path); - - if (stat == null) { - final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); - - // Create a node and then delete it to remove the registered watcher. This is a work-around for - // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event - // that happens for that node. If no events happen, the watcher stays registered foreverz. - // Couple that with the fact that you cannot set a watcher when you create a node, but what we - // want is to create a node and then watch for it to get deleted. The solution is that you *can* - // set a watcher when you check to see if it exists so, we first create the node and then set a - // watcher on its existence. However, if already does not exist by the time the existence check - // returns, then the watcher that was set will never fire (nobody will ever create the node - // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause - // that watcher to fire and delete it right away. - // - // We do not create the existence watcher first, because then it will fire when we create the - // node and we'll have the same race when trying to refresh that watcher. - curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); - - entryRemoved(path); - } - } - catch (Exception e) { - failAssign(e); - } - } else { - log.info( - "Server[%s] skipping doNext() because something is currently loading[%s].", - basePath, - currentlyProcessing.getSegmentIdentifier() + } + catch (Exception e) { + failAssign(e); + } + } + }, + config.getLoadTimeoutDelay().getMillis(), + TimeUnit.MILLISECONDS ); + + final Stat stat = curator.checkExists().usingWatcher( + new CuratorWatcher() + { + @Override + public void process(WatchedEvent watchedEvent) throws Exception + { + switch (watchedEvent.getType()) { + case NodeDeleted: + entryRemoved(watchedEvent.getPath()); + break; + default: + // do nothing + } + } + } + ).forPath(path); + + if (stat == null) { + final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop()); + + // Create a node and then delete it to remove the registered watcher. This is a work-around for + // a zookeeper race condition. Specifically, when you set a watcher, it fires on the next event + // that happens for that node. If no events happen, the watcher stays registered foreverz. + // Couple that with the fact that you cannot set a watcher when you create a node, but what we + // want is to create a node and then watch for it to get deleted. The solution is that you *can* + // set a watcher when you check to see if it exists so, we first create the node and then set a + // watcher on its existence. However, if already does not exist by the time the existence check + // returns, then the watcher that was set will never fire (nobody will ever create the node + // again) and thus lead to a slow, but real, memory leak. So, we create another node to cause + // that watcher to fire and delete it right away. + // + // We do not create the existence watcher first, because then it will fire when we create the + // node and we'll have the same race when trying to refresh that watcher. + curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload); + + entryRemoved(path); + } + } + catch (Exception e) { + failAssign(e); } }