SOLR-11590: Synchronize ZK connect/disconnect handling so that they are processed in linear order

This commit is contained in:
Varun Thacker 2017-12-05 13:34:50 -08:00
parent eb434dc47a
commit 2c14b91418
3 changed files with 20 additions and 5 deletions

View File

@ -205,6 +205,9 @@ Bug Fixes
* SOLR-11458: Improve error handling in MoveReplicaCmd to avoid potential loss of data. (ab)
* SOLR-11590: Synchronize ZK connect/disconnect handling so that they are processed in linear order
(noble, Varun Thacker)
Optimizations
----------------------
* SOLR-11285: Refactor autoscaling framework to avoid direct references to Zookeeper and Solr

View File

@ -43,8 +43,6 @@ public class ConnectionManager implements Watcher {
private final String zkServerAddress;
private final SolrZkClient client;
private final OnReconnect onReconnect;
@ -118,6 +116,7 @@ public class ConnectionManager implements Watcher {
KeeperState state = event.getState();
if (state == KeeperState.SyncConnected) {
log.info("zkClient has connected");
connected();
connectionStrategy.connected();
} else if (state == Expired) {
@ -179,7 +178,7 @@ public class ConnectionManager implements Watcher {
}
} while (!isClosed);
log.info("Connected:" + connected);
log.info("zkClient Connected:" + connected);
} else if (state == KeeperState.Disconnected) {
log.warn("zkClient has disconnected");
disconnected();

View File

@ -77,7 +77,10 @@ public class SolrZkClient implements Closeable {
private ZkCmdExecutor zkCmdExecutor;
private final ExecutorService zkCallbackExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("zkCallback"));
private final ExecutorService zkCallbackExecutor =
ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("zkCallback"));
private final ExecutorService zkConnManagerCallbackExecutor =
ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("zkConnectionManagerCallback"));
private volatile boolean isClosed = false;
private ZkClientConnectionStrategy zkClientConnectionStrategy;
@ -259,7 +262,11 @@ public class SolrZkClient implements Closeable {
public void process(final WatchedEvent event) {
log.debug("Submitting job to respond to event " + event);
try {
zkCallbackExecutor.submit(() -> watcher.process(event));
if (watcher instanceof ConnectionManager) {
zkConnManagerCallbackExecutor.submit(() -> watcher.process(event));
} else {
zkCallbackExecutor.submit(() -> watcher.process(event));
}
} catch (RejectedExecutionException e) {
// If not a graceful shutdown
if (!isClosed()) {
@ -680,6 +687,12 @@ public class SolrZkClient implements Closeable {
} catch (Exception e) {
SolrException.log(log, e);
}
try {
ExecutorUtil.shutdownAndAwaitTermination(zkConnManagerCallbackExecutor);
} catch (Exception e) {
SolrException.log(log, e);
}
}