LUCENE-3985: add close methods to Overseer and ConnectionManager and use them

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1369236 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-08-03 22:34:08 +00:00
parent 034207be25
commit 7add2f7c62
6 changed files with 93 additions and 31 deletions

View File

@ -250,16 +250,13 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
final class OverseerElectionContext extends ElectionContext {
private final SolrZkClient zkClient;
private final ZkStateReader stateReader;
private ShardHandler shardHandler;
private String adminPath;
private Overseer overseer;
public OverseerElectionContext(ShardHandler shardHandler, String adminPath, final String zkNodeName, ZkStateReader stateReader) {
super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient());
this.stateReader = stateReader;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
this.zkClient = stateReader.getZkClient();
public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) {
super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, zkClient);
this.overseer = overseer;
this.zkClient = zkClient;
}
@Override
@ -281,7 +278,7 @@ final class OverseerElectionContext extends ElectionContext {
CreateMode.EPHEMERAL, true);
}
new Overseer(shardHandler, adminPath, stateReader, id);
overseer.start(id);
}
}

View File

@ -47,7 +47,7 @@ public class Overseer {
private static Logger log = LoggerFactory.getLogger(Overseer.class);
private static class CloudStateUpdater implements Runnable {
private class CloudStateUpdater implements Runnable {
private static final String DELETECORE = "deletecore";
private final ZkStateReader reader;
@ -70,7 +70,7 @@ public class Overseer {
@Override
public void run() {
if(amILeader()) {
if(amILeader() && !Overseer.this.isClosed) {
// see if there's something left from the previous Overseer and re
// process all events that were not persisted into cloud state
synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
@ -110,7 +110,7 @@ public class Overseer {
}
log.info("Starting to work on the main queue");
while (amILeader()) {
while (amILeader() && !isClosed) {
synchronized (reader.getUpdateLock()) {
try {
byte[] head = stateUpdateQueue.peek();
@ -401,21 +401,45 @@ public class Overseer {
}
}
private Thread ccThread;
private Thread updaterThread;
private volatile boolean isClosed;
private ZkStateReader reader;
private ShardHandler shardHandler;
private String adminPath;
public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader, final String id) throws KeeperException, InterruptedException {
public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException {
this.reader = reader;
this.shardHandler = shardHandler;
this.adminPath = adminPath;
}
public void start(String id) {
log.info("Overseer (id=" + id + ") starting");
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater.");
Thread updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
updaterThread.setDaemon(true);
updaterThread.start();
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
Thread ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath));
ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath),
"Overseer-" + id);
ccThread.setDaemon(true);
updaterThread.start();
ccThread.start();
}
public void close() {
isClosed = true;
}
/**
* Get queue that can be used to send messages to Overseer.

View File

@ -64,6 +64,8 @@ public class OverseerCollectionProcessor implements Runnable {
private String adminPath;
private ZkStateReader zkStateReader;
private boolean isClosed;
public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
this.zkStateReader = zkStateReader;
@ -76,7 +78,7 @@ public class OverseerCollectionProcessor implements Runnable {
@Override
public void run() {
log.info("Process current queue of collection creations");
while (amILeader()) {
while (amILeader() && !isClosed) {
try {
byte[] head = workQueue.peek(true);
@ -108,6 +110,10 @@ public class OverseerCollectionProcessor implements Runnable {
}
}
public void close() {
isClosed = true;
}
private boolean amILeader() {
try {
ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(

View File

@ -121,6 +121,8 @@ public final class ZkController {
// may accept defaults or use mocks rather than pulling things from a CoreContainer
private CoreContainer cc;
protected volatile Overseer overseer;
/**
* @param cc if null, recovery will not be enabled
* @param zkServerAddress
@ -170,10 +172,8 @@ public final class ZkController {
shardHandler = cc.getShardHandlerFactory().getShardHandler();
adminPath = cc.getAdminPath();
}
ElectionContext context = new OverseerElectionContext(
shardHandler, adminPath,
getNodeName(), zkStateReader);
ZkController.this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();
@ -242,6 +242,11 @@ public final class ZkController {
* Closes the underlying ZooKeeper client.
*/
public void close() {
try {
overseer.close();
} catch(Throwable t) {
log.error("Error closing overseer", t);
}
try {
zkClient.close();
} catch (InterruptedException e) {
@ -366,8 +371,8 @@ public final class ZkController {
}
overseerElector = new LeaderElector(zkClient);
ElectionContext context = new OverseerElectionContext(shardHandler,
adminPath, getNodeName(), zkStateReader);
this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
overseerElector.setup(context);
overseerElector.joinElection(context);
zkStateReader.createClusterStateWatchersAndUpdate();

View File

@ -47,6 +47,8 @@ class ConnectionManager implements Watcher {
private OnReconnect onReconnect;
private volatile boolean isClosed = false;
public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
this.name = name;
this.client = client;
@ -68,6 +70,8 @@ class ConnectionManager implements Watcher {
log.info("Watcher " + this + " name:" + name + " got event " + event
+ " path:" + event.getPath() + " type:" + event.getType());
}
checkClosed();
state = event.getState();
if (state == KeeperState.SyncConnected) {
@ -81,11 +85,18 @@ class ConnectionManager implements Watcher {
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {
@Override
public void update(SolrZooKeeper keeper)
throws InterruptedException, TimeoutException {
public void update(SolrZooKeeper keeper) throws TimeoutException {
synchronized (connectionStrategy) {
waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
client.updateKeeper(keeper);
checkClosed();
try {
waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
checkClosed();
client.updateKeeper(keeper);
} catch (InterruptedException e) {
// we must have been asked to stop
throw new RuntimeException("Giving up on connecting - we were interrupted");
}
checkClosed();
if (onReconnect != null) {
onReconnect.command();
}
@ -95,6 +106,7 @@ class ConnectionManager implements Watcher {
}
}
});
} catch (Exception e) {
SolrException.log(log, "", e);
@ -109,7 +121,13 @@ class ConnectionManager implements Watcher {
}
public synchronized boolean isConnected() {
return connected;
return !isClosed && connected;
}
// we use a volatile rather than sync
// to avoid deadlock on shutdown
public void close() {
this.isClosed = true;
}
public synchronized KeeperState state() {
@ -122,12 +140,20 @@ class ConnectionManager implements Watcher {
long left = waitForConnection;
while (!connected && left > 0) {
wait(left);
checkClosed();
left = expire - System.currentTimeMillis();
}
if (!connected) {
throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
}
}
private synchronized void checkClosed() {
if (isClosed) {
log.info("Not acting because I am closed");
return;
}
}
public synchronized void waitForDisconnected(long timeout)
throws InterruptedException, TimeoutException {

View File

@ -654,7 +654,11 @@ public class SolrZkClient {
public void close() throws InterruptedException {
if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true;
keeper.close();
try {
keeper.close();
} finally {
connManager.close();
}
numCloses.incrementAndGet();
}