diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 8a5b3bed0ee..e4fbccb9839 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -247,15 +247,7 @@ public final class ZkController { } catch(Throwable t) { log.error("Error closing overseer", t); } - try { - zkClient.close(); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - log.warn("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); - } + zkClient.close(); } /** diff --git a/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java b/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java index 3cbca19efff..61890f7ecce 100644 --- a/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java +++ b/solr/core/src/java/org/apache/solr/servlet/ZookeeperInfoServlet.java @@ -17,6 +17,16 @@ package org.apache.solr.servlet; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URLEncoder; +import java.util.Date; +import java.util.List; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + import org.apache.lucene.util.BytesRef; import org.apache.noggit.CharArr; import org.apache.noggit.JSONWriter; @@ -28,16 +38,6 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.URLEncoder; -import java.util.Date; -import java.util.List; -import java.util.concurrent.TimeoutException; - /** * Zookeeper Info @@ -148,13 +148,7 @@ public final class ZookeeperInfoServlet extends HttpServlet { try { zkClient = new SolrZkClient(addr, 10000); doClose = true; - } catch (TimeoutException e) { - writeError(503, "Could not connect to zookeeper at '" + addr + "'\""); - zkClient = null; - return; - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); + } catch (Exception e) { writeError(503, "Could not connect to zookeeper at '" + addr + "'\""); zkClient = null; return; @@ -163,12 +157,8 @@ public final class ZookeeperInfoServlet extends HttpServlet { } public void close() { - try { - if (doClose) { - zkClient.close(); - } - } catch (InterruptedException e) { - // ignore exception on close + if (doClose) { + zkClient.close(); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java index 38581154abb..1b12c659586 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java @@ -124,9 +124,21 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { try { setupOnConnect(); } catch (InterruptedException e) { + log.error("setup failed", e); + + if (this.zkClient != null) { + this.zkClient.close(); + } + return; } catch (Throwable e) { - // e.printStackTrace(); + log.error("setup failed", e); + + if (this.zkClient != null) { + this.zkClient.close(); + } + + return; } while (!stop) { @@ -221,76 +233,77 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { for (int i = 0; i < 15; i++) { ClientThread thread = new ClientThread(i); - threads.add(thread); } - - for (Thread thread : threads) { - thread.start(); - } - - - while(true) { //wait for election to complete - int doneCount = 0; - for (ClientThread thread : threads) { - if(thread.electionDone) { - doneCount++; + try { + for (Thread thread : threads) { + thread.start(); + } + + while (true) { // wait for election to complete + int doneCount = 0; + for (ClientThread thread : threads) { + if (thread.electionDone) { + doneCount++; + } } + if (doneCount == 15) { + break; + } + Thread.sleep(100); } - if(doneCount==15) { - break; + + int leaderThread = getLeaderThread(); + + // whoever the leader is, should be the n_0 seq + assertEquals(0, threads.get(leaderThread).seq); + + // kill n_0, 1, 3 and 4 + ((ClientThread) seqToThread.get(0)).close(); + + waitForLeader(threads, 1); + + leaderThread = getLeaderThread(); + + // whoever the leader is, should be the n_1 seq + + assertEquals(1, threads.get(leaderThread).seq); + + ((ClientThread) seqToThread.get(4)).close(); + ((ClientThread) seqToThread.get(1)).close(); + ((ClientThread) seqToThread.get(3)).close(); + + // whoever the leader is, should be the n_2 seq + + waitForLeader(threads, 2); + + leaderThread = getLeaderThread(); + assertEquals(2, threads.get(leaderThread).seq); + + // kill n_5, 2, 6, 7, and 8 + ((ClientThread) seqToThread.get(5)).close(); + ((ClientThread) seqToThread.get(2)).close(); + ((ClientThread) seqToThread.get(6)).close(); + ((ClientThread) seqToThread.get(7)).close(); + ((ClientThread) seqToThread.get(8)).close(); + + waitForLeader(threads, 9); + leaderThread = getLeaderThread(); + + // whoever the leader is, should be the n_9 seq + assertEquals(9, threads.get(leaderThread).seq); + + } finally { + // cleanup any threads still running + for (ClientThread thread : threads) { + thread.close(); + thread.interrupt(); + + } + + for (Thread thread : threads) { + thread.join(); } - Thread.sleep(100); - } - - int leaderThread = getLeaderThread(); - - // whoever the leader is, should be the n_0 seq - assertEquals(0, threads.get(leaderThread).seq); - - // kill n_0, 1, 3 and 4 - ((ClientThread) seqToThread.get(0)).close(); - - waitForLeader(threads, 1); - - leaderThread = getLeaderThread(); - - // whoever the leader is, should be the n_1 seq - - assertEquals(1, threads.get(leaderThread).seq); - - ((ClientThread) seqToThread.get(4)).close(); - ((ClientThread) seqToThread.get(1)).close(); - ((ClientThread) seqToThread.get(3)).close(); - - // whoever the leader is, should be the n_2 seq - - waitForLeader(threads, 2); - - leaderThread = getLeaderThread(); - assertEquals(2, threads.get(leaderThread).seq); - - // kill n_5, 2, 6, 7, and 8 - ((ClientThread) seqToThread.get(5)).close(); - ((ClientThread) seqToThread.get(2)).close(); - ((ClientThread) seqToThread.get(6)).close(); - ((ClientThread) seqToThread.get(7)).close(); - ((ClientThread) seqToThread.get(8)).close(); - - waitForLeader(threads, 9); - leaderThread = getLeaderThread(); - - // whoever the leader is, should be the n_9 seq - assertEquals(9, threads.get(leaderThread).seq); - - // cleanup any threads still running - for (ClientThread thread : threads) { - thread.close(); - thread.interrupt(); - } - - for (Thread thread : threads) { - thread.join(); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java index 3dfabeec985..32ed58f65e5 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java @@ -95,13 +95,9 @@ public class OverseerTest extends SolrTestCaseJ4 { } } - public void close(){ - try { - deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName); - zkClient.close(); - } catch (InterruptedException e) { - //e.printStackTrace(); - } + public void close() { + deleteNode(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName); + zkClient.close(); } public void publishState(String coreName, String stateName, int numShards) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java index bea6221d2a6..d9eefc69ac8 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java @@ -92,11 +92,22 @@ class ConnectionManager implements Watcher { synchronized (connectionStrategy) { try { waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT); + } catch (InterruptedException e1) { + closeKeeper(keeper); + Thread.currentThread().interrupt(); + throw new RuntimeException("Giving up on connecting - we were interrupted", e1); + } catch (Exception e1) { + closeKeeper(keeper); + throw new RuntimeException(e1); + } + + try { client.updateKeeper(keeper); } catch (InterruptedException e) { closeKeeper(keeper); + Thread.currentThread().interrupt(); // we must have been asked to stop - throw new RuntimeException("Giving up on connecting - we were interrupted"); + throw new RuntimeException(e); } catch(Throwable t) { closeKeeper(keeper); throw new RuntimeException(t); @@ -142,12 +153,12 @@ class ConnectionManager implements Watcher { public synchronized void waitForConnected(long waitForConnection) throws InterruptedException, TimeoutException { long expire = System.currentTimeMillis() + waitForConnection; - long left = waitForConnection; + long left = 1; while (!connected && left > 0) { - wait(left); if (isClosed) { break; } + wait(500); left = expire - System.currentTimeMillis(); } if (!connected) { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java index 38297ac4549..730de649ddc 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -82,11 +82,11 @@ public class SolrZkClient { * @throws TimeoutException * @throws IOException */ - public SolrZkClient(String zkServerAddress, int zkClientTimeout) throws InterruptedException, TimeoutException, IOException { + public SolrZkClient(String zkServerAddress, int zkClientTimeout) { this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null); } - public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) throws InterruptedException, TimeoutException, IOException { + public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) { this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), onReonnect, zkClientConnectTimeout); } @@ -100,8 +100,7 @@ public class SolrZkClient { * @throws IOException */ public SolrZkClient(String zkServerAddress, int zkClientTimeout, - ZkClientConnectionStrategy strat, final OnReconnect onReconnect) throws InterruptedException, - TimeoutException, IOException { + ZkClientConnectionStrategy strat, final OnReconnect onReconnect) { this(zkServerAddress, zkClientTimeout, strat, onReconnect, DEFAULT_CLIENT_CONNECT_TIMEOUT); } @@ -116,42 +115,46 @@ public class SolrZkClient { * @throws IOException */ public SolrZkClient(String zkServerAddress, int zkClientTimeout, - ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) throws InterruptedException, - TimeoutException, IOException { + ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) { connManager = new ConnectionManager("ZooKeeperConnection Watcher:" + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect); - strat.connect(zkServerAddress, zkClientTimeout, connManager, - new ZkUpdate() { - @Override - public void update(SolrZooKeeper zooKeeper) { - SolrZooKeeper oldKeeper = keeper; - keeper = zooKeeper; - if (oldKeeper != null) { + try { + strat.connect(zkServerAddress, zkClientTimeout, connManager, + new ZkUpdate() { + @Override + public void update(SolrZooKeeper zooKeeper) { + SolrZooKeeper oldKeeper = keeper; + keeper = zooKeeper; try { - oldKeeper.close(); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); + closeKeeper(oldKeeper); + } finally { + if (isClosed) { + // we may have been closed + closeKeeper(SolrZkClient.this.keeper); + } } } - if (isClosed) { - // we may have been closed - try { - SolrZkClient.this.keeper.close(); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, - "", e); - } - } - } - }); - connManager.waitForConnected(clientConnectTimeout); + }); + } catch (IOException e) { + connManager.close(); + throw new RuntimeException(); + } catch (InterruptedException e) { + connManager.close(); + throw new RuntimeException(); + } catch (TimeoutException e) { + connManager.close(); + throw new RuntimeException(); + } + try { + connManager.waitForConnected(clientConnectTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + connManager.close(); + throw new RuntimeException(); + } catch (TimeoutException e) { + connManager.close(); + throw new RuntimeException(); + } numOpens.incrementAndGet(); } @@ -663,11 +666,11 @@ public class SolrZkClient { /** * @throws InterruptedException */ - public void close() throws InterruptedException { + public void close() { if (isClosed) return; // it's okay if we over close - same as solrcore isClosed = true; try { - keeper.close(); + closeKeeper(keeper); } finally { connManager.close(); } @@ -685,18 +688,32 @@ public class SolrZkClient { * @throws InterruptedException */ void updateKeeper(SolrZooKeeper keeper) throws InterruptedException { - if (isClosed) throw new RuntimeException("client is closed"); SolrZooKeeper oldKeeper = this.keeper; this.keeper = keeper; if (oldKeeper != null) { oldKeeper.close(); } + // we might have been closed already if (isClosed) this.keeper.close(); } public SolrZooKeeper getSolrZooKeeper() { return keeper; } + + private void closeKeeper(SolrZooKeeper keeper) { + if (keeper != null) { + try { + keeper.close(); + } catch (InterruptedException e) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + log.error("", e); + throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", + e); + } + } + } // yeah, it's recursive :( public void clean(String path) throws InterruptedException, KeeperException { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index f1b80162916..3dee5b503ca 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -373,15 +373,7 @@ public class ZkStateReader { public void close() { if (closeClient) { - try { - zkClient.close(); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - log.error("", e); - throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", - e); - } + zkClient.close(); } }