SOLR-3727: improve solrcloud close/shutdown

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1372701 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-08-14 01:31:27 +00:00
parent 680449363f
commit d4b66af3b3
7 changed files with 166 additions and 155 deletions

View File

@ -247,15 +247,7 @@ public final class ZkController {
} catch(Throwable t) { } catch(Throwable t) {
log.error("Error closing overseer", t); log.error("Error closing overseer", t);
} }
try { zkClient.close();
zkClient.close();
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
} }
/** /**

View File

@ -17,6 +17,16 @@
package org.apache.solr.servlet; package org.apache.solr.servlet;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URLEncoder;
import java.util.Date;
import java.util.List;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.noggit.CharArr; import org.apache.noggit.CharArr;
import org.apache.noggit.JSONWriter; import org.apache.noggit.JSONWriter;
@ -28,16 +38,6 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URLEncoder;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeoutException;
/** /**
* Zookeeper Info * Zookeeper Info
@ -148,13 +148,7 @@ public final class ZookeeperInfoServlet extends HttpServlet {
try { try {
zkClient = new SolrZkClient(addr, 10000); zkClient = new SolrZkClient(addr, 10000);
doClose = true; doClose = true;
} catch (TimeoutException e) { } catch (Exception e) {
writeError(503, "Could not connect to zookeeper at '" + addr + "'\"");
zkClient = null;
return;
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
writeError(503, "Could not connect to zookeeper at '" + addr + "'\""); writeError(503, "Could not connect to zookeeper at '" + addr + "'\"");
zkClient = null; zkClient = null;
return; return;
@ -163,12 +157,8 @@ public final class ZookeeperInfoServlet extends HttpServlet {
} }
public void close() { public void close() {
try { if (doClose) {
if (doClose) { zkClient.close();
zkClient.close();
}
} catch (InterruptedException e) {
// ignore exception on close
} }
} }

View File

@ -124,9 +124,21 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
try { try {
setupOnConnect(); setupOnConnect();
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("setup failed", e);
if (this.zkClient != null) {
this.zkClient.close();
}
return; return;
} catch (Throwable e) { } catch (Throwable e) {
// e.printStackTrace(); log.error("setup failed", e);
if (this.zkClient != null) {
this.zkClient.close();
}
return;
} }
while (!stop) { while (!stop) {
@ -221,76 +233,77 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
for (int i = 0; i < 15; i++) { for (int i = 0; i < 15; i++) {
ClientThread thread = new ClientThread(i); ClientThread thread = new ClientThread(i);
threads.add(thread); threads.add(thread);
} }
try {
for (Thread thread : threads) { for (Thread thread : threads) {
thread.start(); thread.start();
} }
while (true) { // wait for election to complete
while(true) { //wait for election to complete int doneCount = 0;
int doneCount = 0; for (ClientThread thread : threads) {
for (ClientThread thread : threads) { if (thread.electionDone) {
if(thread.electionDone) { doneCount++;
doneCount++; }
} }
if (doneCount == 15) {
break;
}
Thread.sleep(100);
} }
if(doneCount==15) {
break; int leaderThread = getLeaderThread();
// whoever the leader is, should be the n_0 seq
assertEquals(0, threads.get(leaderThread).seq);
// kill n_0, 1, 3 and 4
((ClientThread) seqToThread.get(0)).close();
waitForLeader(threads, 1);
leaderThread = getLeaderThread();
// whoever the leader is, should be the n_1 seq
assertEquals(1, threads.get(leaderThread).seq);
((ClientThread) seqToThread.get(4)).close();
((ClientThread) seqToThread.get(1)).close();
((ClientThread) seqToThread.get(3)).close();
// whoever the leader is, should be the n_2 seq
waitForLeader(threads, 2);
leaderThread = getLeaderThread();
assertEquals(2, threads.get(leaderThread).seq);
// kill n_5, 2, 6, 7, and 8
((ClientThread) seqToThread.get(5)).close();
((ClientThread) seqToThread.get(2)).close();
((ClientThread) seqToThread.get(6)).close();
((ClientThread) seqToThread.get(7)).close();
((ClientThread) seqToThread.get(8)).close();
waitForLeader(threads, 9);
leaderThread = getLeaderThread();
// whoever the leader is, should be the n_9 seq
assertEquals(9, threads.get(leaderThread).seq);
} finally {
// cleanup any threads still running
for (ClientThread thread : threads) {
thread.close();
thread.interrupt();
}
for (Thread thread : threads) {
thread.join();
} }
Thread.sleep(100);
}
int leaderThread = getLeaderThread();
// whoever the leader is, should be the n_0 seq
assertEquals(0, threads.get(leaderThread).seq);
// kill n_0, 1, 3 and 4
((ClientThread) seqToThread.get(0)).close();
waitForLeader(threads, 1);
leaderThread = getLeaderThread();
// whoever the leader is, should be the n_1 seq
assertEquals(1, threads.get(leaderThread).seq);
((ClientThread) seqToThread.get(4)).close();
((ClientThread) seqToThread.get(1)).close();
((ClientThread) seqToThread.get(3)).close();
// whoever the leader is, should be the n_2 seq
waitForLeader(threads, 2);
leaderThread = getLeaderThread();
assertEquals(2, threads.get(leaderThread).seq);
// kill n_5, 2, 6, 7, and 8
((ClientThread) seqToThread.get(5)).close();
((ClientThread) seqToThread.get(2)).close();
((ClientThread) seqToThread.get(6)).close();
((ClientThread) seqToThread.get(7)).close();
((ClientThread) seqToThread.get(8)).close();
waitForLeader(threads, 9);
leaderThread = getLeaderThread();
// whoever the leader is, should be the n_9 seq
assertEquals(9, threads.get(leaderThread).seq);
// cleanup any threads still running
for (ClientThread thread : threads) {
thread.close();
thread.interrupt();
}
for (Thread thread : threads) {
thread.join();
} }
} }

View File

@ -95,13 +95,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
} }
} }
public void close(){ public void close() {
try { deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName);
deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName); zkClient.close();
zkClient.close();
} catch (InterruptedException e) {
//e.printStackTrace();
}
} }
public void publishState(String coreName, String stateName, int numShards) public void publishState(String coreName, String stateName, int numShards)

View File

@ -92,11 +92,22 @@ class ConnectionManager implements Watcher {
synchronized (connectionStrategy) { synchronized (connectionStrategy) {
try { try {
waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT); waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
} catch (InterruptedException e1) {
closeKeeper(keeper);
Thread.currentThread().interrupt();
throw new RuntimeException("Giving up on connecting - we were interrupted", e1);
} catch (Exception e1) {
closeKeeper(keeper);
throw new RuntimeException(e1);
}
try {
client.updateKeeper(keeper); client.updateKeeper(keeper);
} catch (InterruptedException e) { } catch (InterruptedException e) {
closeKeeper(keeper); closeKeeper(keeper);
Thread.currentThread().interrupt();
// we must have been asked to stop // we must have been asked to stop
throw new RuntimeException("Giving up on connecting - we were interrupted"); throw new RuntimeException(e);
} catch(Throwable t) { } catch(Throwable t) {
closeKeeper(keeper); closeKeeper(keeper);
throw new RuntimeException(t); throw new RuntimeException(t);
@ -142,12 +153,12 @@ class ConnectionManager implements Watcher {
public synchronized void waitForConnected(long waitForConnection) public synchronized void waitForConnected(long waitForConnection)
throws InterruptedException, TimeoutException { throws InterruptedException, TimeoutException {
long expire = System.currentTimeMillis() + waitForConnection; long expire = System.currentTimeMillis() + waitForConnection;
long left = waitForConnection; long left = 1;
while (!connected && left > 0) { while (!connected && left > 0) {
wait(left);
if (isClosed) { if (isClosed) {
break; break;
} }
wait(500);
left = expire - System.currentTimeMillis(); left = expire - System.currentTimeMillis();
} }
if (!connected) { if (!connected) {

View File

@ -82,11 +82,11 @@ public class SolrZkClient {
* @throws TimeoutException * @throws TimeoutException
* @throws IOException * @throws IOException
*/ */
public SolrZkClient(String zkServerAddress, int zkClientTimeout) throws InterruptedException, TimeoutException, IOException { public SolrZkClient(String zkServerAddress, int zkClientTimeout) {
this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null); this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
} }
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) throws InterruptedException, TimeoutException, IOException { public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) {
this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), onReonnect, zkClientConnectTimeout); this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), onReonnect, zkClientConnectTimeout);
} }
@ -100,8 +100,7 @@ public class SolrZkClient {
* @throws IOException * @throws IOException
*/ */
public SolrZkClient(String zkServerAddress, int zkClientTimeout, public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect) throws InterruptedException, ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
TimeoutException, IOException {
this(zkServerAddress, zkClientTimeout, strat, onReconnect, DEFAULT_CLIENT_CONNECT_TIMEOUT); this(zkServerAddress, zkClientTimeout, strat, onReconnect, DEFAULT_CLIENT_CONNECT_TIMEOUT);
} }
@ -116,42 +115,46 @@ public class SolrZkClient {
* @throws IOException * @throws IOException
*/ */
public SolrZkClient(String zkServerAddress, int zkClientTimeout, public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException, ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) {
TimeoutException, IOException {
connManager = new ConnectionManager("ZooKeeperConnection Watcher:" connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect); + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
strat.connect(zkServerAddress, zkClientTimeout, connManager, try {
new ZkUpdate() { strat.connect(zkServerAddress, zkClientTimeout, connManager,
@Override new ZkUpdate() {
public void update(SolrZooKeeper zooKeeper) { @Override
SolrZooKeeper oldKeeper = keeper; public void update(SolrZooKeeper zooKeeper) {
keeper = zooKeeper; SolrZooKeeper oldKeeper = keeper;
if (oldKeeper != null) { keeper = zooKeeper;
try { try {
oldKeeper.close(); closeKeeper(oldKeeper);
} catch (InterruptedException e) { } finally {
// Restore the interrupted status if (isClosed) {
Thread.currentThread().interrupt(); // we may have been closed
log.error("", e); closeKeeper(SolrZkClient.this.keeper);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, }
"", e);
} }
} }
if (isClosed) { });
// we may have been closed } catch (IOException e) {
try { connManager.close();
SolrZkClient.this.keeper.close(); throw new RuntimeException();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// Restore the interrupted status connManager.close();
Thread.currentThread().interrupt(); throw new RuntimeException();
log.error("", e); } catch (TimeoutException e) {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, connManager.close();
"", e); throw new RuntimeException();
} }
} try {
} connManager.waitForConnected(clientConnectTimeout);
}); } catch (InterruptedException e) {
connManager.waitForConnected(clientConnectTimeout); Thread.currentThread().interrupt();
connManager.close();
throw new RuntimeException();
} catch (TimeoutException e) {
connManager.close();
throw new RuntimeException();
}
numOpens.incrementAndGet(); numOpens.incrementAndGet();
} }
@ -663,11 +666,11 @@ public class SolrZkClient {
/** /**
* @throws InterruptedException * @throws InterruptedException
*/ */
public void close() throws InterruptedException { public void close() {
if (isClosed) return; // it's okay if we over close - same as solrcore if (isClosed) return; // it's okay if we over close - same as solrcore
isClosed = true; isClosed = true;
try { try {
keeper.close(); closeKeeper(keeper);
} finally { } finally {
connManager.close(); connManager.close();
} }
@ -685,18 +688,32 @@ public class SolrZkClient {
* @throws InterruptedException * @throws InterruptedException
*/ */
void updateKeeper(SolrZooKeeper keeper) throws InterruptedException { void updateKeeper(SolrZooKeeper keeper) throws InterruptedException {
if (isClosed) throw new RuntimeException("client is closed");
SolrZooKeeper oldKeeper = this.keeper; SolrZooKeeper oldKeeper = this.keeper;
this.keeper = keeper; this.keeper = keeper;
if (oldKeeper != null) { if (oldKeeper != null) {
oldKeeper.close(); oldKeeper.close();
} }
// we might have been closed already
if (isClosed) this.keeper.close(); if (isClosed) this.keeper.close();
} }
public SolrZooKeeper getSolrZooKeeper() { public SolrZooKeeper getSolrZooKeeper() {
return keeper; return keeper;
} }
private void closeKeeper(SolrZooKeeper keeper) {
if (keeper != null) {
try {
keeper.close();
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
}
}
}
// yeah, it's recursive :( // yeah, it's recursive :(
public void clean(String path) throws InterruptedException, KeeperException { public void clean(String path) throws InterruptedException, KeeperException {

View File

@ -373,15 +373,7 @@ public class ZkStateReader {
public void close() { public void close() {
if (closeClient) { if (closeClient) {
try { zkClient.close();
zkClient.close();
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
}
} }
} }