SOLR-6261: Run ZooKeeper watch event callbacks in parallel to the ZooKeeper event thread.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1614244 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2014-07-29 03:04:35 +00:00
parent dcb260ad14
commit 6aeceaeb86
5 changed files with 291 additions and 146 deletions

View File

@ -242,6 +242,9 @@ Optimizations
* SOLR-5968: BinaryResponseWriter fetches unnecessary stored fields when only pseudo-fields
are requested. (Gregg Donovan via shalin)
* SOLR-6261: Run ZooKeeper watch event callbacks in parallel to the ZooKeeper
event thread. (Ramkumar Aiyengar via Mark Miller)
Other Changes
---------------------

View File

@ -356,18 +356,12 @@ public class LeaderElector {
try {
// am I the next leader?
checkIfIamLeader(seq, context, true);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
log.warn("", e);
} catch (IOException e) {
log.warn("", e);
} catch (Exception e) {
log.warn("", e);
}
}
}
/**
* Set up any ZooKeeper nodes needed for leader election.
*/
@ -402,4 +396,5 @@ public class LeaderElector {
this.context = ctx;
joinElection(ctx, true, joinAtHead);
}
}

View File

@ -78,64 +78,104 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
seqToThread = Collections.synchronizedMap(new HashMap<Integer,Thread>());
}
class ClientThread extends Thread {
class TestLeaderElectionContext extends ShardLeaderElectionContextBase {
private long runLeaderDelay = 0;
public TestLeaderElectionContext(LeaderElector leaderElector,
String shardId, String collection, String coreNodeName, ZkNodeProps props,
ZkStateReader zkStateReader, long runLeaderDelay) {
super (leaderElector, shardId, collection, coreNodeName, props, zkStateReader);
this.runLeaderDelay = runLeaderDelay;
}
@Override
void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
super.runLeaderProcess(weAreReplacement, pauseBeforeStartMs);
if (runLeaderDelay > 0) {
log.info("Sleeping for " + runLeaderDelay + "ms to simulate leadership takeover delay");
Thread.sleep(runLeaderDelay);
}
}
}
class ElectorSetup {
SolrZkClient zkClient;
private int nodeNumber;
ZkStateReader zkStateReader;
LeaderElector elector;
public ElectorSetup(OnReconnect onReconnect) {
zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT, TIMEOUT, onReconnect);
zkStateReader = new ZkStateReader(zkClient);
elector = new LeaderElector(zkClient);
}
public void close() {
if (!zkClient.isClosed()) {
zkClient.close();
}
}
}
class ClientThread extends Thread {
ElectorSetup es;
private String shard;
private String nodeName;
private long runLeaderDelay = 0;
private volatile int seq = -1;
private volatile boolean stop;
private volatile boolean electionDone = false;
private final ZkNodeProps props;
public ClientThread(int nodeNumber) throws Exception {
super("Thread-" + nodeNumber);
public ClientThread(String shard, int nodeNumber) throws Exception {
this(null, shard, nodeNumber, 0);
}
public ClientThread(ElectorSetup es, String shard, int nodeNumber, long runLeaderDelay) throws Exception {
super("Thread-" + shard + nodeNumber);
this.shard = shard;
this.nodeName = shard + nodeNumber;
this.runLeaderDelay = runLeaderDelay;
props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_NAME_PROP, "");
this.zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT, TIMEOUT, new OnReconnect() {
@Override
public void command() {
try {
setupOnConnect();
} catch (Throwable t) {
}
}
});
this.nodeNumber = nodeNumber;
this.es = es;
if (this.es == null) {
this.es = new ElectorSetup(new OnReconnect() {
@Override
public void command() {
try {
setupOnConnect();
} catch (Throwable t) {
}
}
});
}
}
private void setupOnConnect() throws InterruptedException, KeeperException,
IOException {
ZkStateReader zkStateReader = new ZkStateReader(zkClient);
LeaderElector elector = new LeaderElector(zkClient);
ShardLeaderElectionContextBase context = new ShardLeaderElectionContextBase(
elector, "shard1", "collection1", Integer.toString(nodeNumber),
props, zkStateReader);
elector.setup(context);
seq = elector.joinElection(context, false);
assertNotNull(es);
TestLeaderElectionContext context = new TestLeaderElectionContext(
es.elector, shard, "collection1", nodeName,
props, es.zkStateReader, runLeaderDelay);
es.elector.setup(context);
seq = es.elector.joinElection(context, false);
electionDone = true;
seqToThread.put(seq, this);
}
@Override
public void run() {
try {
setupOnConnect();
} catch (InterruptedException e) {
log.error("setup failed", e);
if (this.zkClient != null) {
this.zkClient.close();
}
es.close();
return;
} catch (Throwable e) {
log.error("setup failed", e);
if (this.zkClient != null) {
this.zkClient.close();
}
es.close();
return;
}
@ -149,20 +189,14 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
}
public void close() throws InterruptedException {
if (!zkClient.isClosed()) {
zkClient.close();
}
public void close() {
es.close();
this.stop = true;
}
public int getSeq() {
return seq;
}
public int getNodeNumber() {
return nodeNumber;
}
}
@Test
@ -224,32 +258,36 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
throw new RuntimeException("Could not get leader props");
}
private static void startAndJoinElection (List<ClientThread> threads) throws InterruptedException {
for (Thread thread : threads) {
thread.start();
}
while (true) { // wait for election to complete
int doneCount = 0;
for (ClientThread thread : threads) {
if (thread.electionDone) {
doneCount++;
}
}
if (doneCount == threads.size()) {
break;
}
Thread.sleep(100);
}
}
@Test
public void testElection() throws Exception {
List<ClientThread> threads = new ArrayList<>();
for (int i = 0; i < 15; i++) {
ClientThread thread = new ClientThread(i);
ClientThread thread = new ClientThread("shard1", i);
threads.add(thread);
}
try {
for (Thread thread : threads) {
thread.start();
}
while (true) { // wait for election to complete
int doneCount = 0;
for (ClientThread thread : threads) {
if (thread.electionDone) {
doneCount++;
}
}
if (doneCount == 15) {
break;
}
Thread.sleep(100);
}
startAndJoinElection(threads);
int leaderThread = getLeaderThread();
@ -306,6 +344,55 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
}
@Test
public void testParallelElection() throws Exception {
final int numShards = 2 + random().nextInt(18);
log.info("Testing parallel election across " + numShards + " shards");
List<ClientThread> threads = new ArrayList<>();
try {
List<ClientThread> replica1s = new ArrayList<>();
ElectorSetup es1 = new ElectorSetup(null);
for (int i = 1; i <= numShards; i++) {
ClientThread thread = new ClientThread(es1, "parshard" + i, 1, 0 /* don't delay */);
threads.add(thread);
replica1s.add(thread);
}
startAndJoinElection(replica1s);
log.info("First replicas brought up and registered");
// bring up second in line
List<ClientThread> replica2s = new ArrayList<>();
ElectorSetup es2 = new ElectorSetup(null);
for (int i = 1; i <= numShards; i++) {
ClientThread thread = new ClientThread(es2, "parshard" + i, 2, 40000 / (numShards - 1) /* delay enough to timeout or expire */);
threads.add(thread);
replica2s.add(thread);
}
startAndJoinElection(replica2s);
log.info("Second replicas brought up and registered");
// disconnect the leaders
es1.close();
for (int i = 1; i <= numShards; i ++) {
// if this test fails, getLeaderUrl will more likely throw an exception and fail the test,
// but add an assertEquals as well for good measure
assertEquals("2/", getLeaderUrl("collection1", "parshard" + i));
}
} finally {
// cleanup any threads still running
for (ClientThread thread : threads) {
thread.close();
thread.interrupt();
}
for (Thread thread : threads) {
thread.join();
}
}
}
private void waitForLeader(List<ClientThread> threads, int seq)
throws KeeperException, InterruptedException {
int leaderThread;
@ -334,7 +421,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
// start with a leader
ClientThread thread1 = null;
thread1 = new ClientThread(0);
thread1 = new ClientThread("shard1", 0);
threads.add(thread1);
scheduler.schedule(thread1, 0, TimeUnit.MILLISECONDS);
@ -348,7 +435,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
int launchIn = random().nextInt(500);
ClientThread thread = null;
try {
thread = new ClientThread(i);
thread = new ClientThread("shard1", i);
} catch (Exception e) {
//
}
@ -375,10 +462,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
}
try {
threads.get(j).close();
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
}
Thread.sleep(10);
@ -398,7 +482,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
int j;
j = random().nextInt(threads.size());
try {
threads.get(j).zkClient.getSolrZooKeeper().pauseCnxn(
threads.get(j).es.zkClient.getSolrZooKeeper().pauseCnxn(
ZkTestServer.TICK_TIME * 2);
} catch (Exception e) {
e.printStackTrace();
@ -436,7 +520,7 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
// cleanup any threads still running
for (ClientThread thread : threads) {
thread.zkClient.getSolrZooKeeper().close();
thread.es.zkClient.getSolrZooKeeper().close();
thread.close();
}

View File

@ -17,8 +17,8 @@ package org.apache.solr.cloud;
* the License.
*/
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@ -38,61 +38,72 @@ public class ZkSolrClientTest extends AbstractSolrTestCase {
public static void beforeClass() throws Exception {
initCore("solrconfig.xml", "schema.xml");
}
class ZkConnection implements AutoCloseable {
private ZkTestServer server = null;
private SolrZkClient zkClient = null;
ZkConnection() throws Exception {
this (true);
}
ZkConnection(boolean makeRoot) throws Exception {
String zkDir = createTempDir("zkData").getAbsolutePath();
server = new ZkTestServer(zkDir);
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
if (makeRoot) AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
zkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT);
}
public ZkTestServer getServer () {
return server;
}
public SolrZkClient getClient () {
return zkClient;
}
@Override
public void close() throws Exception {
if (zkClient != null) zkClient.close();
if (server != null) server.shutdown();
}
}
public void testConnect() throws Exception {
String zkDir = createTempDir("zkData").getAbsolutePath();
ZkTestServer server = null;
server = new ZkTestServer(zkDir);
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT);
zkClient.close();
server.shutdown();
try (ZkConnection conn = new ZkConnection (false)) {
// do nothing
}
}
public void testMakeRootNode() throws Exception {
String zkDir = createTempDir("zkData").getAbsolutePath();
ZkTestServer server = null;
server = new ZkTestServer(zkDir);
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
SolrZkClient zkClient = new SolrZkClient(server.getZkHost(),
AbstractZkTestCase.TIMEOUT);
assertTrue(zkClient.exists("/solr", true));
zkClient.close();
server.shutdown();
try (ZkConnection conn = new ZkConnection ()) {
final SolrZkClient zkClient = new SolrZkClient(conn.getServer().getZkHost(), AbstractZkTestCase.TIMEOUT);
try {
assertTrue(zkClient.exists("/solr", true));
} finally {
zkClient.close();
}
}
}
public void testClean() throws Exception {
String zkDir = createTempDir("zkData").getAbsolutePath();
ZkTestServer server = null;
try (ZkConnection conn = new ZkConnection ()) {
final SolrZkClient zkClient = conn.getClient();
server = new ZkTestServer(zkDir);
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
zkClient.makePath("/test/path/here", true);
SolrZkClient zkClient = new SolrZkClient(server.getZkHost(),
AbstractZkTestCase.TIMEOUT);
zkClient.makePath("/zz/path/here", true);
zkClient.makePath("/test/path/here", true);
zkClient.makePath("/zz/path/here", true);
zkClient.clean("/");
assertFalse(zkClient.exists("/test", true));
assertFalse(zkClient.exists("/zz", true));
zkClient.clean("/");
zkClient.close();
server.shutdown();
assertFalse(zkClient.exists("/test", true));
assertFalse(zkClient.exists("/zz", true));
}
}
public void testReconnect() throws Exception {
@ -188,18 +199,44 @@ public class ZkSolrClientTest extends AbstractSolrTestCase {
}
}
public void testMultipleWatchesAsync() throws Exception {
try (ZkConnection conn = new ZkConnection ()) {
final SolrZkClient zkClient = conn.getClient();
zkClient.makePath("/collections", true);
final int numColls = random().nextInt(100);
final CountDownLatch latch = new CountDownLatch(numColls);
for (int i = 1; i <= numColls; i ++) {
String collPath = "/collections/collection" + i;
zkClient.makePath(collPath, true);
zkClient.getChildren(collPath, new Watcher() {
@Override
public void process(WatchedEvent event) {
latch.countDown();
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {}
}
}, true);
}
for (int i = 1; i <= numColls; i ++) {
String shardsPath = "/collections/collection" + i + "/shards";
zkClient.makePath(shardsPath, true);
}
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
}
}
public void testWatchChildren() throws Exception {
String zkDir = createTempDir("zkData").getAbsolutePath();
final AtomicInteger cnt = new AtomicInteger();
ZkTestServer server = new ZkTestServer(zkDir);
server.run();
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
Thread.sleep(400);
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
final SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), AbstractZkTestCase.TIMEOUT);
try {
try (ZkConnection conn = new ZkConnection ()) {
final SolrZkClient zkClient = conn.getClient();
final AtomicInteger cnt = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
zkClient.makePath("/collections", true);
zkClient.getChildren("/collections", new Watcher() {
@ -248,14 +285,6 @@ public class ZkSolrClientTest extends AbstractSolrTestCase {
assertEquals(2, cnt.intValue());
} finally {
if (zkClient != null) {
zkClient.close();
}
if (server != null) {
server.shutdown();
}
}
}

View File

@ -24,6 +24,8 @@ import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import javax.xml.transform.OutputKeys;
@ -36,10 +38,13 @@ import javax.xml.transform.stream.StreamSource;
import org.apache.commons.io.FileUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkClientConnectionStrategy.ZkUpdate;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
@ -72,10 +77,12 @@ public class SolrZkClient {
private ZkCmdExecutor zkCmdExecutor;
private final ExecutorService zkCallbackExecutor = Executors.newCachedThreadPool(new SolrjNamedThreadFactory("zkCallback"));
private volatile boolean isClosed = false;
private ZkClientConnectionStrategy zkClientConnectionStrategy;
private int zkClientTimeout;
public int getZkClientTimeout() {
return zkClientTimeout;
}
@ -183,6 +190,24 @@ public class SolrZkClient {
}
}
private Watcher wrapWatcher (final Watcher watcher) {
if (watcher == null) return watcher;
// wrap the watcher so that it doesn't fire off ZK's event queue
return new Watcher() {
@Override
public void process(final WatchedEvent event) {
log.debug("Submitting job to respond to event " + event);
zkCallbackExecutor.submit(new Runnable () {
@Override
public void run () {
watcher.process(event);
}
});
}
};
}
/**
* Return the stat of the node of the given path. Return null if no such a
* node exists.
@ -206,11 +231,11 @@ public class SolrZkClient {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public Stat execute() throws KeeperException, InterruptedException {
return keeper.exists(path, watcher);
return keeper.exists(path, wrapWatcher(watcher));
}
});
} else {
return keeper.exists(path, watcher);
return keeper.exists(path, wrapWatcher(watcher));
}
}
@ -257,11 +282,11 @@ public class SolrZkClient {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public List<String> execute() throws KeeperException, InterruptedException {
return keeper.getChildren(path, watcher);
return keeper.getChildren(path, wrapWatcher(watcher));
}
});
} else {
return keeper.getChildren(path, watcher);
return keeper.getChildren(path, wrapWatcher(watcher));
}
}
@ -274,11 +299,11 @@ public class SolrZkClient {
return zkCmdExecutor.retryOperation(new ZkOperation() {
@Override
public byte[] execute() throws KeeperException, InterruptedException {
return keeper.getData(path, watcher, stat);
return keeper.getData(path, wrapWatcher(watcher), stat);
}
});
} else {
return keeper.getData(path, watcher, stat);
return keeper.getData(path, wrapWatcher(watcher), stat);
}
}
@ -570,6 +595,7 @@ public class SolrZkClient {
closeKeeper(keeper);
} finally {
connManager.close();
closeCallbackExecutor();
}
numCloses.incrementAndGet();
}
@ -609,6 +635,14 @@ public class SolrZkClient {
}
}
private void closeCallbackExecutor() {
try {
ExecutorUtil.shutdownAndAwaitTermination(zkCallbackExecutor);
} catch (Exception e) {
SolrException.log(log, e);
}
}
// yeah, it's recursive :(
public void clean(String path) throws InterruptedException, KeeperException {
List<String> children;