From 6aeceaeb860061d1dc1f9c5417d1d2ec7e50990d Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Tue, 29 Jul 2014 03:04:35 +0000 Subject: [PATCH] SOLR-6261: Run ZooKeeper watch event callbacks in parallel to the ZooKeeper event thread. git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1614244 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 3 + .../org/apache/solr/cloud/LeaderElector.java | 9 +- .../apache/solr/cloud/LeaderElectionTest.java | 220 ++++++++++++------ .../apache/solr/cloud/ZkSolrClientTest.java | 157 ++++++++----- .../solr/common/cloud/SolrZkClient.java | 48 +++- 5 files changed, 291 insertions(+), 146 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index df1786c6cad..76ae64e7fad 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -242,6 +242,9 @@ Optimizations * SOLR-5968: BinaryResponseWriter fetches unnecessary stored fields when only pseudo-fields are requested. (Gregg Donovan via shalin) +* SOLR-6261: Run ZooKeeper watch event callbacks in parallel to the ZooKeeper + event thread. (Ramkumar Aiyengar via Mark Miller) + Other Changes --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java index a12dfae44b7..4c7f9403d50 100644 --- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java @@ -356,18 +356,12 @@ public class LeaderElector { try { // am I the next leader? checkIfIamLeader(seq, context, true); - } catch (InterruptedException e) { - // Restore the interrupted status - Thread.currentThread().interrupt(); - log.warn("", e); - } catch (IOException e) { - log.warn("", e); } catch (Exception e) { log.warn("", e); } } } - + /** * Set up any ZooKeeper nodes needed for leader election. */ @@ -402,4 +396,5 @@ public class LeaderElector { this.context = ctx; joinElection(ctx, true, joinAtHead); } + } diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java index b3f01c2d4af..fc7cac58b13 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java @@ -78,64 +78,104 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { seqToThread = Collections.synchronizedMap(new HashMap()); } - class ClientThread extends Thread { + class TestLeaderElectionContext extends ShardLeaderElectionContextBase { + private long runLeaderDelay = 0; + + public TestLeaderElectionContext(LeaderElector leaderElector, + String shardId, String collection, String coreNodeName, ZkNodeProps props, + ZkStateReader zkStateReader, long runLeaderDelay) { + super (leaderElector, shardId, collection, coreNodeName, props, zkStateReader); + this.runLeaderDelay = runLeaderDelay; + } + + @Override + void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) + throws KeeperException, InterruptedException, IOException { + super.runLeaderProcess(weAreReplacement, pauseBeforeStartMs); + if (runLeaderDelay > 0) { + log.info("Sleeping for " + runLeaderDelay + "ms to simulate leadership takeover delay"); + Thread.sleep(runLeaderDelay); + } + } + } + + class ElectorSetup { SolrZkClient zkClient; - private int nodeNumber; + ZkStateReader zkStateReader; + LeaderElector elector; + + public ElectorSetup(OnReconnect onReconnect) { + zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT, TIMEOUT, onReconnect); + zkStateReader = new ZkStateReader(zkClient); + elector = new LeaderElector(zkClient); + } + + public void close() { + if (!zkClient.isClosed()) { + zkClient.close(); + } + } + } + + class ClientThread extends Thread { + ElectorSetup es; + private String shard; + private String nodeName; + private long runLeaderDelay = 0; private volatile int seq = -1; private volatile boolean stop; private volatile boolean electionDone = false; private final ZkNodeProps props; - - public ClientThread(int nodeNumber) throws Exception { - super("Thread-" + nodeNumber); - + + public ClientThread(String shard, int nodeNumber) throws Exception { + this(null, shard, nodeNumber, 0); + } + + public ClientThread(ElectorSetup es, String shard, int nodeNumber, long runLeaderDelay) throws Exception { + super("Thread-" + shard + nodeNumber); + this.shard = shard; + this.nodeName = shard + nodeNumber; + this.runLeaderDelay = runLeaderDelay; + props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_NAME_PROP, ""); - this.zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT, TIMEOUT, new OnReconnect() { - - @Override - public void command() { - try { - setupOnConnect(); - } catch (Throwable t) { - } - } - }); - this.nodeNumber = nodeNumber; + this.es = es; + if (this.es == null) { + this.es = new ElectorSetup(new OnReconnect() { + @Override + public void command() { + try { + setupOnConnect(); + } catch (Throwable t) { + } + } + }); + } } - + private void setupOnConnect() throws InterruptedException, KeeperException, IOException { - ZkStateReader zkStateReader = new ZkStateReader(zkClient); - LeaderElector elector = new LeaderElector(zkClient); - ShardLeaderElectionContextBase context = new ShardLeaderElectionContextBase( - elector, "shard1", "collection1", Integer.toString(nodeNumber), - props, zkStateReader); - elector.setup(context); - seq = elector.joinElection(context, false); + assertNotNull(es); + TestLeaderElectionContext context = new TestLeaderElectionContext( + es.elector, shard, "collection1", nodeName, + props, es.zkStateReader, runLeaderDelay); + es.elector.setup(context); + seq = es.elector.joinElection(context, false); electionDone = true; seqToThread.put(seq, this); } - + @Override public void run() { try { setupOnConnect(); } catch (InterruptedException e) { log.error("setup failed", e); - - if (this.zkClient != null) { - this.zkClient.close(); - } - + es.close(); return; } catch (Throwable e) { log.error("setup failed", e); - - if (this.zkClient != null) { - this.zkClient.close(); - } - + es.close(); return; } @@ -149,20 +189,14 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { } - public void close() throws InterruptedException { - if (!zkClient.isClosed()) { - zkClient.close(); - } + public void close() { + es.close(); this.stop = true; } public int getSeq() { return seq; } - - public int getNodeNumber() { - return nodeNumber; - } } @Test @@ -224,32 +258,36 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { throw new RuntimeException("Could not get leader props"); } + private static void startAndJoinElection (List threads) throws InterruptedException { + for (Thread thread : threads) { + thread.start(); + } + + while (true) { // wait for election to complete + int doneCount = 0; + for (ClientThread thread : threads) { + if (thread.electionDone) { + doneCount++; + } + } + if (doneCount == threads.size()) { + break; + } + Thread.sleep(100); + } + } + @Test public void testElection() throws Exception { List threads = new ArrayList<>(); for (int i = 0; i < 15; i++) { - ClientThread thread = new ClientThread(i); + ClientThread thread = new ClientThread("shard1", i); threads.add(thread); } try { - for (Thread thread : threads) { - thread.start(); - } - - while (true) { // wait for election to complete - int doneCount = 0; - for (ClientThread thread : threads) { - if (thread.electionDone) { - doneCount++; - } - } - if (doneCount == 15) { - break; - } - Thread.sleep(100); - } + startAndJoinElection(threads); int leaderThread = getLeaderThread(); @@ -306,6 +344,55 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { } + @Test + public void testParallelElection() throws Exception { + final int numShards = 2 + random().nextInt(18); + log.info("Testing parallel election across " + numShards + " shards"); + + List threads = new ArrayList<>(); + + try { + List replica1s = new ArrayList<>(); + ElectorSetup es1 = new ElectorSetup(null); + for (int i = 1; i <= numShards; i++) { + ClientThread thread = new ClientThread(es1, "parshard" + i, 1, 0 /* don't delay */); + threads.add(thread); + replica1s.add(thread); + } + startAndJoinElection(replica1s); + log.info("First replicas brought up and registered"); + + // bring up second in line + List replica2s = new ArrayList<>(); + ElectorSetup es2 = new ElectorSetup(null); + for (int i = 1; i <= numShards; i++) { + ClientThread thread = new ClientThread(es2, "parshard" + i, 2, 40000 / (numShards - 1) /* delay enough to timeout or expire */); + threads.add(thread); + replica2s.add(thread); + } + startAndJoinElection(replica2s); + log.info("Second replicas brought up and registered"); + + // disconnect the leaders + es1.close(); + + for (int i = 1; i <= numShards; i ++) { + // if this test fails, getLeaderUrl will more likely throw an exception and fail the test, + // but add an assertEquals as well for good measure + assertEquals("2/", getLeaderUrl("collection1", "parshard" + i)); + } + } finally { + // cleanup any threads still running + for (ClientThread thread : threads) { + thread.close(); + thread.interrupt(); + } + for (Thread thread : threads) { + thread.join(); + } + } + } + private void waitForLeader(List threads, int seq) throws KeeperException, InterruptedException { int leaderThread; @@ -334,7 +421,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { // start with a leader ClientThread thread1 = null; - thread1 = new ClientThread(0); + thread1 = new ClientThread("shard1", 0); threads.add(thread1); scheduler.schedule(thread1, 0, TimeUnit.MILLISECONDS); @@ -348,7 +435,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { int launchIn = random().nextInt(500); ClientThread thread = null; try { - thread = new ClientThread(i); + thread = new ClientThread("shard1", i); } catch (Exception e) { // } @@ -375,10 +462,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { } try { threads.get(j).close(); - } catch (InterruptedException e) { - throw e; } catch (Exception e) { - } Thread.sleep(10); @@ -398,7 +482,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { int j; j = random().nextInt(threads.size()); try { - threads.get(j).zkClient.getSolrZooKeeper().pauseCnxn( + threads.get(j).es.zkClient.getSolrZooKeeper().pauseCnxn( ZkTestServer.TICK_TIME * 2); } catch (Exception e) { e.printStackTrace(); @@ -436,7 +520,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 { // cleanup any threads still running for (ClientThread thread : threads) { - thread.zkClient.getSolrZooKeeper().close(); + thread.es.zkClient.getSolrZooKeeper().close(); thread.close(); } diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java index d6c0746801e..bf9ed47e519 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkSolrClientTest.java @@ -17,8 +17,8 @@ package org.apache.solr.cloud; * the License. */ -import java.io.File; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; @@ -38,61 +38,72 @@ public class ZkSolrClientTest extends AbstractSolrTestCase { public static void beforeClass() throws Exception { initCore("solrconfig.xml", "schema.xml"); } - + + class ZkConnection implements AutoCloseable { + + private ZkTestServer server = null; + private SolrZkClient zkClient = null; + + ZkConnection() throws Exception { + this (true); + } + + ZkConnection(boolean makeRoot) throws Exception { + String zkDir = createTempDir("zkData").getAbsolutePath(); + server = new ZkTestServer(zkDir); + server.run(); + + AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); + if (makeRoot) AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); + + zkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT); + } + + public ZkTestServer getServer () { + return server; + } + + public SolrZkClient getClient () { + return zkClient; + } + + @Override + public void close() throws Exception { + if (zkClient != null) zkClient.close(); + if (server != null) server.shutdown(); + } + } + public void testConnect() throws Exception { - String zkDir = createTempDir("zkData").getAbsolutePath(); - ZkTestServer server = null; - - server = new ZkTestServer(zkDir); - server.run(); - AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); - SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT); - - zkClient.close(); - server.shutdown(); + try (ZkConnection conn = new ZkConnection (false)) { + // do nothing + } } public void testMakeRootNode() throws Exception { - String zkDir = createTempDir("zkData").getAbsolutePath(); - ZkTestServer server = null; - - server = new ZkTestServer(zkDir); - server.run(); - AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); - AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); - - SolrZkClient zkClient = new SolrZkClient(server.getZkHost(), - AbstractZkTestCase.TIMEOUT); - - assertTrue(zkClient.exists("/solr", true)); - - zkClient.close(); - server.shutdown(); + try (ZkConnection conn = new ZkConnection ()) { + final SolrZkClient zkClient = new SolrZkClient(conn.getServer().getZkHost(), AbstractZkTestCase.TIMEOUT); + try { + assertTrue(zkClient.exists("/solr", true)); + } finally { + zkClient.close(); + } + } } - + public void testClean() throws Exception { - String zkDir = createTempDir("zkData").getAbsolutePath(); - ZkTestServer server = null; + try (ZkConnection conn = new ZkConnection ()) { + final SolrZkClient zkClient = conn.getClient(); - server = new ZkTestServer(zkDir); - server.run(); - AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); - + zkClient.makePath("/test/path/here", true); - SolrZkClient zkClient = new SolrZkClient(server.getZkHost(), - AbstractZkTestCase.TIMEOUT); + zkClient.makePath("/zz/path/here", true); - zkClient.makePath("/test/path/here", true); - - zkClient.makePath("/zz/path/here", true); - - zkClient.clean("/"); - - assertFalse(zkClient.exists("/test", true)); - assertFalse(zkClient.exists("/zz", true)); + zkClient.clean("/"); - zkClient.close(); - server.shutdown(); + assertFalse(zkClient.exists("/test", true)); + assertFalse(zkClient.exists("/zz", true)); + } } public void testReconnect() throws Exception { @@ -188,18 +199,44 @@ public class ZkSolrClientTest extends AbstractSolrTestCase { } } + public void testMultipleWatchesAsync() throws Exception { + try (ZkConnection conn = new ZkConnection ()) { + final SolrZkClient zkClient = conn.getClient(); + zkClient.makePath("/collections", true); + + final int numColls = random().nextInt(100); + final CountDownLatch latch = new CountDownLatch(numColls); + + for (int i = 1; i <= numColls; i ++) { + String collPath = "/collections/collection" + i; + zkClient.makePath(collPath, true); + zkClient.getChildren(collPath, new Watcher() { + @Override + public void process(WatchedEvent event) { + latch.countDown(); + try { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } + }, true); + } + + for (int i = 1; i <= numColls; i ++) { + String shardsPath = "/collections/collection" + i + "/shards"; + zkClient.makePath(shardsPath, true); + } + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + } + } + public void testWatchChildren() throws Exception { - String zkDir = createTempDir("zkData").getAbsolutePath(); - - final AtomicInteger cnt = new AtomicInteger(); - ZkTestServer server = new ZkTestServer(zkDir); - server.run(); - AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost()); - Thread.sleep(400); - AbstractZkTestCase.makeSolrZkNode(server.getZkHost()); - final SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT); - try { + try (ZkConnection conn = new ZkConnection ()) { + final SolrZkClient zkClient = conn.getClient(); + final AtomicInteger cnt = new AtomicInteger(); final CountDownLatch latch = new CountDownLatch(1); + zkClient.makePath("/collections", true); zkClient.getChildren("/collections", new Watcher() { @@ -248,14 +285,6 @@ public class ZkSolrClientTest extends AbstractSolrTestCase { assertEquals(2, cnt.intValue()); - } finally { - - if (zkClient != null) { - zkClient.close(); - } - if (server != null) { - server.shutdown(); - } } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java index d8ea26859ed..841ad6247bb 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -24,6 +24,8 @@ import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import javax.xml.transform.OutputKeys; @@ -36,10 +38,13 @@ import javax.xml.transform.stream.StreamSource; import org.apache.commons.io.FileUtils; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; @@ -72,10 +77,12 @@ public class SolrZkClient { private ZkCmdExecutor zkCmdExecutor; + private final ExecutorService zkCallbackExecutor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("zkCallback")); + private volatile boolean isClosed = false; private ZkClientConnectionStrategy zkClientConnectionStrategy; private int zkClientTimeout; - + public int getZkClientTimeout() { return zkClientTimeout; } @@ -183,6 +190,24 @@ public class SolrZkClient { } } + private Watcher wrapWatcher (final Watcher watcher) { + if (watcher == null) return watcher; + + // wrap the watcher so that it doesn't fire off ZK's event queue + return new Watcher() { + @Override + public void process(final WatchedEvent event) { + log.debug("Submitting job to respond to event " + event); + zkCallbackExecutor.submit(new Runnable () { + @Override + public void run () { + watcher.process(event); + } + }); + } + }; + } + /** * Return the stat of the node of the given path. Return null if no such a * node exists. @@ -206,11 +231,11 @@ public class SolrZkClient { return zkCmdExecutor.retryOperation(new ZkOperation() { @Override public Stat execute() throws KeeperException, InterruptedException { - return keeper.exists(path, watcher); + return keeper.exists(path, wrapWatcher(watcher)); } }); } else { - return keeper.exists(path, watcher); + return keeper.exists(path, wrapWatcher(watcher)); } } @@ -257,11 +282,11 @@ public class SolrZkClient { return zkCmdExecutor.retryOperation(new ZkOperation() { @Override public List execute() throws KeeperException, InterruptedException { - return keeper.getChildren(path, watcher); + return keeper.getChildren(path, wrapWatcher(watcher)); } }); } else { - return keeper.getChildren(path, watcher); + return keeper.getChildren(path, wrapWatcher(watcher)); } } @@ -274,11 +299,11 @@ public class SolrZkClient { return zkCmdExecutor.retryOperation(new ZkOperation() { @Override public byte[] execute() throws KeeperException, InterruptedException { - return keeper.getData(path, watcher, stat); + return keeper.getData(path, wrapWatcher(watcher), stat); } }); } else { - return keeper.getData(path, watcher, stat); + return keeper.getData(path, wrapWatcher(watcher), stat); } } @@ -570,6 +595,7 @@ public class SolrZkClient { closeKeeper(keeper); } finally { connManager.close(); + closeCallbackExecutor(); } numCloses.incrementAndGet(); } @@ -609,6 +635,14 @@ public class SolrZkClient { } } + private void closeCallbackExecutor() { + try { + ExecutorUtil.shutdownAndAwaitTermination(zkCallbackExecutor); + } catch (Exception e) { + SolrException.log(log, e); + } + } + // yeah, it's recursive :( public void clean(String path) throws InterruptedException, KeeperException { List children;