From 2c14b91418b45c42aba98ea2e612e9c0a53a0948 Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Tue, 5 Dec 2017 13:34:50 -0800 Subject: [PATCH] SOLR-11590: Synchronize ZK connect/disconnect handling so that they are processed in linear order --- solr/CHANGES.txt | 3 +++ .../solr/common/cloud/ConnectionManager.java | 5 ++--- .../apache/solr/common/cloud/SolrZkClient.java | 17 +++++++++++++++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 91474ebf514..2f09b53703b 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -205,6 +205,9 @@ Bug Fixes * SOLR-11458: Improve error handling in MoveReplicaCmd to avoid potential loss of data. (ab) +* SOLR-11590: Synchronize ZK connect/disconnect handling so that they are processed in linear order + (noble, Varun Thacker) + Optimizations ---------------------- * SOLR-11285: Refactor autoscaling framework to avoid direct references to Zookeeper and Solr diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java index 16b7fcd8b4b..98ddb477879 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java @@ -43,8 +43,6 @@ public class ConnectionManager implements Watcher { private final String zkServerAddress; - - private final SolrZkClient client; private final OnReconnect onReconnect; @@ -118,6 +116,7 @@ public class ConnectionManager implements Watcher { KeeperState state = event.getState(); if (state == KeeperState.SyncConnected) { + log.info("zkClient has connected"); connected(); connectionStrategy.connected(); } else if (state == Expired) { @@ -179,7 +178,7 @@ public class ConnectionManager implements Watcher { } } while (!isClosed); - log.info("Connected:" + connected); + log.info("zkClient Connected:" + connected); } else if (state == KeeperState.Disconnected) { log.warn("zkClient has disconnected"); disconnected(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java index d502bf6ab28..a5f303e8ae1 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -77,7 +77,10 @@ public class SolrZkClient implements Closeable { private ZkCmdExecutor zkCmdExecutor; - private final ExecutorService zkCallbackExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("zkCallback")); + private final ExecutorService zkCallbackExecutor = + ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("zkCallback")); + private final ExecutorService zkConnManagerCallbackExecutor = + ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("zkConnectionManagerCallback")); private volatile boolean isClosed = false; private ZkClientConnectionStrategy zkClientConnectionStrategy; @@ -259,7 +262,11 @@ public class SolrZkClient implements Closeable { public void process(final WatchedEvent event) { log.debug("Submitting job to respond to event " + event); try { - zkCallbackExecutor.submit(() -> watcher.process(event)); + if (watcher instanceof ConnectionManager) { + zkConnManagerCallbackExecutor.submit(() -> watcher.process(event)); + } else { + zkCallbackExecutor.submit(() -> watcher.process(event)); + } } catch (RejectedExecutionException e) { // If not a graceful shutdown if (!isClosed()) { @@ -680,6 +687,12 @@ public class SolrZkClient implements Closeable { } catch (Exception e) { SolrException.log(log, e); } + + try { + ExecutorUtil.shutdownAndAwaitTermination(zkConnManagerCallbackExecutor); + } catch (Exception e) { + SolrException.log(log, e); + } }