SOLR-3727: improve solrcloud close/shutdown

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1372626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-08-13 21:22:08 +00:00
parent f08b7105b4
commit 2c31ae30e5
8 changed files with 115 additions and 32 deletions

View File

@ -25,6 +25,7 @@ import java.util.Map.Entry;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -47,7 +48,7 @@ public class Overseer {
private static Logger log = LoggerFactory.getLogger(Overseer.class);
private class ClusterStateUpdater implements Runnable {
private class ClusterStateUpdater implements Runnable, ClosableThread {
private static final String DELETECORE = "deletecore";
private final ZkStateReader reader;
@ -58,6 +59,7 @@ public class Overseer {
//Internal queue where overseer stores events that have not yet been published into cloudstate
//If Overseer dies while extracting the main queue a new overseer will start from this queue
private final DistributedQueue workQueue;
private volatile boolean isClosed;
public ClusterStateUpdater(final ZkStateReader reader, final String myId) {
this.zkClient = reader.getZkClient();
@ -70,7 +72,7 @@ public class Overseer {
@Override
public void run() {
if(amILeader() && !Overseer.this.isClosed) {
if(!this.isClosed && amILeader()) {
// see if there's something left from the previous Overseer and re
// process all events that were not persisted into cloud state
synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
@ -110,7 +112,7 @@ public class Overseer {
}
log.info("Starting to work on the main queue");
while (amILeader() && !isClosed) {
while (!this.isClosed && amILeader()) {
synchronized (reader.getUpdateLock()) {
try {
byte[] head = stateUpdateQueue.peek();
@ -399,12 +401,48 @@ public class Overseer {
ClusterState newState = new ClusterState(clusterState.getLiveNodes(), newStates);
return newState;
}
@Override
public void close() {
this.isClosed = true;
}
@Override
public boolean isClosed() {
return this.isClosed;
}
}
private Thread ccThread;
class OverseerThread extends Thread implements ClosableThread {
private Thread updaterThread;
private volatile boolean isClosed;
public OverseerThread(ThreadGroup tg,
ClusterStateUpdater clusterStateUpdater) {
super(tg, clusterStateUpdater);
}
public OverseerThread(ThreadGroup ccTg,
OverseerCollectionProcessor overseerCollectionProcessor, String string) {
super(ccTg, overseerCollectionProcessor, string);
}
@Override
public void close() {
this.isClosed = true;
}
@Override
public boolean isClosed() {
return this.isClosed;
}
}
private OverseerThread ccThread;
private OverseerThread updaterThread;
private volatile boolean isClosed;
@ -425,11 +463,11 @@ public class Overseer {
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater.");
updaterThread = new Thread(tg, new ClusterStateUpdater(reader, id));
updaterThread = new OverseerThread(tg, new ClusterStateUpdater(reader, id));
updaterThread.setDaemon(true);
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath),
ccThread = new OverseerThread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath),
"Overseer-" + id);
ccThread.setDaemon(true);
@ -439,6 +477,12 @@ public class Overseer {
public void close() {
isClosed = true;
if (updaterThread != null) {
updaterThread.close();
}
if (ccThread != null) {
ccThread.close();
}
}
/**

View File

@ -31,7 +31,7 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SafeStopThread;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@ -56,7 +56,7 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RecoveryStrategy extends Thread implements SafeStopThread {
public class RecoveryStrategy extends Thread implements ClosableThread {
private static final int MAX_RETRIES = 500;
private static final int INTERRUPTED = MAX_RETRIES + 1;
private static final int STARTING_RECOVERY_DELAY = 1000;

View File

@ -17,7 +17,7 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
public interface SafeStopThread {
public void stop();
public interface ClosableThread {
public void close();
public boolean isClosed();
}

View File

@ -71,7 +71,9 @@ class ConnectionManager implements Watcher {
+ " path:" + event.getPath() + " type:" + event.getType());
}
checkClosed();
if (isClosed) {
return;
}
state = event.getState();
if (state == KeeperState.SyncConnected) {
@ -85,18 +87,21 @@ class ConnectionManager implements Watcher {
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {
@Override
public void update(SolrZooKeeper keeper) throws TimeoutException {
public void update(SolrZooKeeper keeper) {
// if keeper does not replace oldKeeper we must be sure to close it
synchronized (connectionStrategy) {
checkClosed();
try {
waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
checkClosed();
client.updateKeeper(keeper);
} catch (InterruptedException e) {
closeKeeper(keeper);
// we must have been asked to stop
throw new RuntimeException("Giving up on connecting - we were interrupted");
} catch(Throwable t) {
closeKeeper(keeper);
throw new RuntimeException(t);
}
checkClosed();
if (onReconnect != null) {
onReconnect.command();
}
@ -140,20 +145,15 @@ class ConnectionManager implements Watcher {
long left = waitForConnection;
while (!connected && left > 0) {
wait(left);
checkClosed();
if (isClosed) {
break;
}
left = expire - System.currentTimeMillis();
}
if (!connected) {
throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
}
}
private synchronized void checkClosed() {
if (isClosed) {
log.info("Not acting because I am closed");
return;
}
}
public synchronized void waitForDisconnected(long timeout)
throws InterruptedException, TimeoutException {
@ -167,4 +167,16 @@ class ConnectionManager implements Watcher {
throw new TimeoutException("Did not disconnect");
}
}
private void closeKeeper(SolrZooKeeper keeper) {
try {
keeper.close();
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
}
}

View File

@ -137,6 +137,18 @@ public class SolrZkClient {
"", e);
}
}
if (isClosed) {
// we may have been closed
try {
SolrZkClient.this.keeper.close();
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
}
}
});
connManager.waitForConnected(clientConnectTimeout);
@ -673,11 +685,13 @@ public class SolrZkClient {
* @throws InterruptedException
*/
void updateKeeper(SolrZooKeeper keeper) throws InterruptedException {
if (isClosed) throw new RuntimeException("client is closed");
SolrZooKeeper oldKeeper = this.keeper;
this.keeper = keeper;
if (oldKeeper != null) {
oldKeeper.close();
}
if (isClosed) this.keeper.close();
}
public SolrZooKeeper getSolrZooKeeper() {

View File

@ -73,9 +73,9 @@ public class ZkCmdExecutor {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
if (Thread.currentThread() instanceof SafeStopThread) {
if (((SafeStopThread) Thread.currentThread()).isClosed()) {
throw new RuntimeException("Interrupted");
if (Thread.currentThread() instanceof ClosableThread) {
if (((ClosableThread) Thread.currentThread()).isClosed()) {
throw exception;
}
}
retryDelay(i);

View File

@ -19,16 +19,23 @@ package org.apache.zookeeper;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
// we use this class to expose nasty stuff for tests
public class SolrZooKeeper extends ZooKeeper {
List<Thread> spawnedThreads = new CopyOnWriteArrayList<Thread>();
// for test debug
//static Map<SolrZooKeeper,Exception> clients = new ConcurrentHashMap<SolrZooKeeper,Exception>();
public SolrZooKeeper(String connectString, int sessionTimeout,
Watcher watcher) throws IOException {
super(connectString, sessionTimeout, watcher);
//clients.put(this, new RuntimeException());
}
public ClientCnxn getConnection() {
@ -64,9 +71,20 @@ public class SolrZooKeeper extends ZooKeeper {
@Override
public synchronized void close() throws InterruptedException {
//clients.remove(this);
for (Thread t : spawnedThreads) {
t.interrupt();
}
super.close();
}
// public static void assertCloses() {
// if (clients.size() > 0) {
// Iterator<Exception> stacktraces = clients.values().iterator();
// Exception cause = null;
// cause = stacktraces.next();
// throw new RuntimeException("Found a bad one!", cause);
// }
// }
}

View File

@ -31,9 +31,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.lucene.util.LuceneTestCase.BadApple;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
@ -42,7 +40,6 @@ import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.ChaosMonkey;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
@ -65,8 +62,6 @@ import org.slf4j.LoggerFactory;
* what we test now - the default update chain
*/
@Slow
@BadApple
@AwaitsFix(bugUrl = "SOLR-3727 (leak threads)")
public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTestBase {
static Logger log = LoggerFactory.getLogger(AbstractFullDistribZkTestBase.class);