From 7add2f7c62478c74bcfd51bf901db5731eea01bd Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Fri, 3 Aug 2012 22:34:08 +0000 Subject: [PATCH] LUCENE-3985: add close methods to Overseer and ConnectionManager and use them git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1369236 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/solr/cloud/ElectionContext.java | 17 ++++---- .../java/org/apache/solr/cloud/Overseer.java | 40 +++++++++++++++---- .../cloud/OverseerCollectionProcessor.java | 8 +++- .../org/apache/solr/cloud/ZkController.java | 17 +++++--- .../solr/common/cloud/ConnectionManager.java | 36 ++++++++++++++--- .../solr/common/cloud/SolrZkClient.java | 6 ++- 6 files changed, 93 insertions(+), 31 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 9412153358b..b5acd84aaf1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -250,16 +250,13 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { final class OverseerElectionContext extends ElectionContext { private final SolrZkClient zkClient; - private final ZkStateReader stateReader; - private ShardHandler shardHandler; - private String adminPath; + private Overseer overseer; - public OverseerElectionContext(ShardHandler shardHandler, String adminPath, final String zkNodeName, ZkStateReader stateReader) { - super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient()); - this.stateReader = stateReader; - this.shardHandler = shardHandler; - this.adminPath = adminPath; - this.zkClient = stateReader.getZkClient(); + + public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) { + super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, zkClient); + this.overseer = overseer; + this.zkClient = zkClient; } @Override @@ -281,7 +278,7 @@ final class OverseerElectionContext extends ElectionContext { CreateMode.EPHEMERAL, true); } - new Overseer(shardHandler, adminPath, stateReader, id); + overseer.start(id); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java index 7218b675d8b..979cfbef1b6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java +++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java @@ -47,7 +47,7 @@ public class Overseer { private static Logger log = LoggerFactory.getLogger(Overseer.class); - private static class CloudStateUpdater implements Runnable { + private class CloudStateUpdater implements Runnable { private static final String DELETECORE = "deletecore"; private final ZkStateReader reader; @@ -70,7 +70,7 @@ public class Overseer { @Override public void run() { - if(amILeader()) { + if(amILeader() && !Overseer.this.isClosed) { // see if there's something left from the previous Overseer and re // process all events that were not persisted into cloud state synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node @@ -110,7 +110,7 @@ public class Overseer { } log.info("Starting to work on the main queue"); - while (amILeader()) { + while (amILeader() && !isClosed) { synchronized (reader.getUpdateLock()) { try { byte[] head = stateUpdateQueue.peek(); @@ -401,21 +401,45 @@ public class Overseer { } } + + private Thread ccThread; + + private Thread updaterThread; + + private volatile boolean isClosed; + + private ZkStateReader reader; + + private ShardHandler shardHandler; + + private String adminPath; - public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader, final String id) throws KeeperException, InterruptedException { + public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException { + this.reader = reader; + this.shardHandler = shardHandler; + this.adminPath = adminPath; + } + + public void start(String id) { log.info("Overseer (id=" + id + ") starting"); createOverseerNode(reader.getZkClient()); //launch cluster state updater thread ThreadGroup tg = new ThreadGroup("Overseer state updater."); - Thread updaterThread = new Thread(tg, new CloudStateUpdater(reader, id)); + updaterThread = new Thread(tg, new CloudStateUpdater(reader, id)); updaterThread.setDaemon(true); - updaterThread.start(); - + ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process."); - Thread ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath)); + ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath), + "Overseer-" + id); ccThread.setDaemon(true); + + updaterThread.start(); ccThread.start(); } + + public void close() { + isClosed = true; + } /** * Get queue that can be used to send messages to Overseer. diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java index 17f4e472204..da99535bec4 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java @@ -64,6 +64,8 @@ public class OverseerCollectionProcessor implements Runnable { private String adminPath; private ZkStateReader zkStateReader; + + private boolean isClosed; public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) { this.zkStateReader = zkStateReader; @@ -76,7 +78,7 @@ public class OverseerCollectionProcessor implements Runnable { @Override public void run() { log.info("Process current queue of collection creations"); - while (amILeader()) { + while (amILeader() && !isClosed) { try { byte[] head = workQueue.peek(true); @@ -108,6 +110,10 @@ public class OverseerCollectionProcessor implements Runnable { } } + public void close() { + isClosed = true; + } + private boolean amILeader() { try { ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData( diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index f84634bc7f1..101a2fa2076 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -121,6 +121,8 @@ public final class ZkController { // may accept defaults or use mocks rather than pulling things from a CoreContainer private CoreContainer cc; + protected volatile Overseer overseer; + /** * @param cc if null, recovery will not be enabled * @param zkServerAddress @@ -170,10 +172,8 @@ public final class ZkController { shardHandler = cc.getShardHandlerFactory().getShardHandler(); adminPath = cc.getAdminPath(); } - - ElectionContext context = new OverseerElectionContext( - shardHandler, adminPath, - getNodeName(), zkStateReader); + ZkController.this.overseer = new Overseer(shardHandler, adminPath, zkStateReader); + ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName()); overseerElector.joinElection(context); zkStateReader.createClusterStateWatchersAndUpdate(); @@ -242,6 +242,11 @@ public final class ZkController { * Closes the underlying ZooKeeper client. */ public void close() { + try { + overseer.close(); + } catch(Throwable t) { + log.error("Error closing overseer", t); + } try { zkClient.close(); } catch (InterruptedException e) { @@ -366,8 +371,8 @@ public final class ZkController { } overseerElector = new LeaderElector(zkClient); - ElectionContext context = new OverseerElectionContext(shardHandler, - adminPath, getNodeName(), zkStateReader); + this.overseer = new Overseer(shardHandler, adminPath, zkStateReader); + ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName()); overseerElector.setup(context); overseerElector.joinElection(context); zkStateReader.createClusterStateWatchersAndUpdate(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java index 9f25b0b96f2..9568b7b9fe2 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java @@ -47,6 +47,8 @@ class ConnectionManager implements Watcher { private OnReconnect onReconnect; + private volatile boolean isClosed = false; + public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) { this.name = name; this.client = client; @@ -68,6 +70,8 @@ class ConnectionManager implements Watcher { log.info("Watcher " + this + " name:" + name + " got event " + event + " path:" + event.getPath() + " type:" + event.getType()); } + + checkClosed(); state = event.getState(); if (state == KeeperState.SyncConnected) { @@ -81,11 +85,18 @@ class ConnectionManager implements Watcher { connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this, new ZkClientConnectionStrategy.ZkUpdate() { @Override - public void update(SolrZooKeeper keeper) - throws InterruptedException, TimeoutException { + public void update(SolrZooKeeper keeper) throws TimeoutException { synchronized (connectionStrategy) { - waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT); - client.updateKeeper(keeper); + checkClosed(); + try { + waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT); + checkClosed(); + client.updateKeeper(keeper); + } catch (InterruptedException e) { + // we must have been asked to stop + throw new RuntimeException("Giving up on connecting - we were interrupted"); + } + checkClosed(); if (onReconnect != null) { onReconnect.command(); } @@ -95,6 +106,7 @@ class ConnectionManager implements Watcher { } } + }); } catch (Exception e) { SolrException.log(log, "", e); @@ -109,7 +121,13 @@ class ConnectionManager implements Watcher { } public synchronized boolean isConnected() { - return connected; + return !isClosed && connected; + } + + // we use a volatile rather than sync + // to avoid deadlock on shutdown + public void close() { + this.isClosed = true; } public synchronized KeeperState state() { @@ -122,12 +140,20 @@ class ConnectionManager implements Watcher { long left = waitForConnection; while (!connected && left > 0) { wait(left); + checkClosed(); left = expire - System.currentTimeMillis(); } if (!connected) { throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms"); } } + + private synchronized void checkClosed() { + if (isClosed) { + log.info("Not acting because I am closed"); + return; + } + } public synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java index 16d11cc1d2b..f88cad1910b 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -654,7 +654,11 @@ public class SolrZkClient { public void close() throws InterruptedException { if (isClosed) return; // it's okay if we over close - same as solrcore isClosed = true; - keeper.close(); + try { + keeper.close(); + } finally { + connManager.close(); + } numCloses.incrementAndGet(); }