SOLR-5436: Eliminate the 1500ms wait in overseer loop as well as polling the ZK distributed queue.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1544255 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2013-11-21 17:28:36 +00:00
parent a5c6c6ddd6
commit 21afda7b22
11 changed files with 189 additions and 79 deletions

View File

@ -135,6 +135,9 @@ Optimizations
* SOLR-5458: Admin UI - Remove separated Pages for Config & Schema (steffkes)
* SOLR-5436: Eliminate the 1500ms wait in overseer loop as well as
polling the ZK distributed queue. (Noble Paul, Mark Miller)
Other Changes
---------------------

View File

@ -115,7 +115,7 @@ public class DistributedQueue {
*
* @return the data at the head of the queue.
*/
private QueueEvent element() throws NoSuchElementException, KeeperException,
private QueueEvent element() throws KeeperException,
InterruptedException {
TreeMap<Long,String> orderedChildren;
@ -130,9 +130,9 @@ public class DistributedQueue {
try {
orderedChildren = orderedChildren(null);
} catch (KeeperException.NoNodeException e) {
throw new NoSuchElementException();
return null;
}
if (orderedChildren.size() == 0) throw new NoSuchElementException();
if (orderedChildren.size() == 0) return null;
for (String headNode : orderedChildren.values()) {
if (headNode != null) {
@ -208,7 +208,7 @@ public class DistributedQueue {
@Override
public void process(WatchedEvent event) {
LOG.info("Watcher fired on path: " + event.getPath() + " state: "
LOG.info("LatchChildWatcher fired on path: " + event.getPath() + " state: "
+ event.getState() + " type " + event.getType());
synchronized (lock) {
this.event = event;
@ -322,11 +322,9 @@ public class DistributedQueue {
* @return data at the first element of the queue, or null.
*/
public byte[] peek() throws KeeperException, InterruptedException {
try {
return element().getBytes();
} catch (NoSuchElementException e) {
return null;
}
QueueEvent element = element();
if(element == null) return null;
return element.getBytes();
}
public static class QueueEvent {
@ -384,16 +382,29 @@ public class DistributedQueue {
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty.
* empty and block is false.
*
* @param block if true, blocks until an element enters the queue
* @return data at the first element of the queue, or null.
*/
public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
if (!block) {
return peek(block ? Long.MAX_VALUE : 0);
}
/**
* Returns the data at the first element of the queue, or null if the queue is
* empty after wait ms.
*
* @param wait max wait time in ms.
* @return data at the first element of the queue, or null.
*/
public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
if (wait == 0) {
return element();
}
TreeMap<Long,String> orderedChildren;
boolean waitedEnough = false;
while (true) {
LatchChildWatcher childWatcher = new LatchChildWatcher();
try {
@ -402,11 +413,15 @@ public class DistributedQueue {
zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT, true);
continue;
}
if(waitedEnough) {
if(orderedChildren.isEmpty()) return null;
}
if (orderedChildren.size() == 0) {
childWatcher.await(DEFAULT_TIMEOUT);
childWatcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT: wait);
waitedEnough = wait != Long.MAX_VALUE;
continue;
}
for (String headNode : orderedChildren.values()) {
String path = dir + "/" + headNode;
try {

View File

@ -71,6 +71,10 @@ public abstract class ElectionContext {
}
abstract void runLeaderProcess(boolean weAreReplacement) throws KeeperException, InterruptedException, IOException;
public void checkIfIamLeaderFired() {}
public void joinedElectionFired() {}
}
class ShardLeaderElectionContextBase extends ElectionContext {
@ -438,4 +442,15 @@ final class OverseerElectionContext extends ElectionContext {
overseer.start(id);
}
@Override
public void joinedElectionFired() {
overseer.close();
}
@Override
public void checkIfIamLeaderFired() {
// leader changed - close the overseer
overseer.close();
}
}

View File

@ -87,6 +87,7 @@ public class LeaderElector {
*/
private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
InterruptedException, IOException {
context.checkIfIamLeaderFired();
// get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE;
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
@ -216,6 +217,8 @@ public class LeaderElector {
* @return sequential node number
*/
public int joinElection(ElectionContext context, boolean replacement) throws KeeperException, InterruptedException, IOException {
context.joinedElectionFired();
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
long sessionId = zkClient.getSolrZooKeeper().getSessionId();

View File

@ -63,6 +63,8 @@ public class Overseer {
static enum LeaderStatus { DONT_KNOW, NO, YES };
private long lastUpdatedTime = 0;
private class ClusterStateUpdater implements Runnable, ClosableThread {
private final ZkStateReader reader;
@ -151,33 +153,51 @@ public class Overseer {
break;
}
else if (LeaderStatus.YES != isLeader) {
log.debug("am_i_leader unclear {}", isLeader);
log.debug("am_i_leader unclear {}", isLeader);
continue; // not a no, not a yes, try ask again
}
DistributedQueue.QueueEvent head = null;
try {
head = stateUpdateQueue.peek(true);
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn(
"Solr cannot talk to ZK, exiting Overseer main queue loop", e);
return;
}
log.error("Exception in Overseer main queue loop", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("Exception in Overseer main queue loop", e);
}
synchronized (reader.getUpdateLock()) {
try {
byte[] head = stateUpdateQueue.peek();
if (head != null) {
reader.updateClusterState(true);
ClusterState clusterState = reader.getClusterState();
reader.updateClusterState(true);
ClusterState clusterState = reader.getClusterState();
while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String operation = message.getStr(QUEUE_OPERATION);
clusterState = processMessage(clusterState, message, operation);
workQueue.offer(head.getBytes());
stateUpdateQueue.poll();
if (System.currentTimeMillis() - lastUpdatedTime > STATE_UPDATE_DELAY) break;
while (head != null) {
final ZkNodeProps message = ZkNodeProps.load(head);
final String operation = message.getStr(QUEUE_OPERATION);
clusterState = processMessage(clusterState, message, operation);
workQueue.offer(head);
stateUpdateQueue.poll();
head = stateUpdateQueue.peek();
}
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
// if an event comes in the next 100ms batch it together
head = stateUpdateQueue.peek(100);
}
lastUpdatedTime = System.currentTimeMillis();
zkClient.setData(ZkStateReader.CLUSTER_STATE,
ZkStateReader.toJSON(clusterState), true);
// clean work queue
while (workQueue.poll() != null);
while (workQueue.poll() != null) ;
} catch (KeeperException e) {
if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
log.warn("Solr cannot talk to ZK, exiting Overseer main queue loop", e);
@ -193,11 +213,6 @@ public class Overseer {
}
}
try {
Thread.sleep(STATE_UPDATE_DELAY);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@ -939,8 +954,6 @@ public class Overseer {
private volatile OverseerThread updaterThread;
private volatile boolean isClosed;
private ZkStateReader reader;
private ShardHandler shardHandler;
@ -954,6 +967,7 @@ public class Overseer {
}
public void start(String id) {
close();
log.info("Overseer (id=" + id + ") starting");
createOverseerNode(reader.getZkClient());
//launch cluster state updater thread
@ -975,7 +989,6 @@ public class Overseer {
}
public void close() {
isClosed = true;
if (updaterThread != null) {
try {
updaterThread.close();
@ -992,12 +1005,8 @@ public class Overseer {
log.error("Error closing ccThread", t);
}
}
try {
reader.close();
} catch (Throwable t) {
log.error("Error closing zkStateReader", t);
}
updaterThread = null;
ccThread = null;
}
/**

View File

@ -292,9 +292,8 @@ public class OverseerCollectionProcessor implements Runnable, ClosableThread {
deleteCoreNode(collectionName, replicaName, replica, core);
if(waitForCoreNodeGone(collectionName, shard, replicaName)) return;
} else {
Map m = ZkNodeProps.makeMap("qt", adminPath,
CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString(),
CoreAdminParams.CORE, core) ;
Map m = ZkNodeProps.makeMap("qt", adminPath, CoreAdminParams.ACTION,
CoreAdminAction.UNLOAD.toString(), CoreAdminParams.CORE, core);
ShardRequest sreq = new ShardRequest();
sreq.purpose = 1;

View File

@ -23,10 +23,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@ -203,46 +203,53 @@ public final class ZkController {
this.leaderVoteWait = leaderVoteWait;
this.clientTimeout = zkClientTimeout;
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout,
zkClientConnectTimeout, new DefaultConnectionStrategy(),
// on reconnect, reload cloud info
new OnReconnect() {
@Override
public void command() {
try {
markAllAsNotLeader(registerOnReconnect);
// this is troublesome - we dont want to kill anything the old leader accepted
// though I guess sync will likely get those updates back? But only if
// this is troublesome - we dont want to kill anything the old
// leader accepted
// though I guess sync will likely get those updates back? But
// only if
// he is involved in the sync, and he certainly may not be
// ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
// ExecutorUtil.shutdownAndAwaitTermination(cc.getCmdDistribExecutor());
// we need to create all of our lost watches
// seems we dont need to do this again...
//Overseer.createClientNodes(zkClient, getNodeName());
// Overseer.createClientNodes(zkClient, getNodeName());
ShardHandler shardHandler;
String adminPath;
shardHandler = cc.getShardHandlerFactory().getShardHandler();
adminPath = cc.getAdminPath();
cc.cancelCoreRecoveries();
registerAllCoresAsDown(registerOnReconnect, false);
ZkController.this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
ElectionContext context = new OverseerElectionContext(zkClient,
overseer, getNodeName());
overseerElector.joinElection(context, true);
zkStateReader.createClusterStateWatchersAndUpdate();
// we have to register as live first to pick up docs in the buffer
createEphemeralLiveNode();
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
List<CoreDescriptor> descriptors = registerOnReconnect
.getCurrentDescriptors();
// re register all descriptors
if (descriptors != null) {
if (descriptors != null) {
for (CoreDescriptor descriptor : descriptors) {
// TODO: we need to think carefully about what happens when it was
// a leader that was expired - as well as what to do about leaders/overseers
// TODO: we need to think carefully about what happens when it
// was
// a leader that was expired - as well as what to do about
// leaders/overseers
// with connection loss
try {
register(descriptor.getName(), descriptor, true, true);
@ -251,7 +258,7 @@ public final class ZkController {
}
}
}
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
@ -262,10 +269,18 @@ public final class ZkController {
throw new ZooKeeperException(
SolrException.ErrorCode.SERVER_ERROR, "", e);
}
}
}, new BeforeReconnect() {
@Override
public void command() {
try {
ZkController.this.overseer.close();
} catch (Exception e) {
log.error("Error trying to stop any Overseer threads", e);
}
}
});
this.overseerJobQueue = Overseer.getInQueue(zkClient);

View File

@ -19,9 +19,11 @@ package org.apache.solr.cloud;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@ -47,6 +49,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -57,7 +60,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
static final int TIMEOUT = 10000;
private static final boolean DEBUG = false;
private List<Overseer> overseers = new ArrayList<Overseer>();
private List<ZkStateReader> readers = new ArrayList<ZkStateReader>();
public static class MockZKController{
@ -182,6 +187,19 @@ public class OverseerTest extends SolrTestCaseJ4 {
initCore();
Thread.sleep(3000); //XXX wait for threads to die...
}
@After
public void tearDown() throws Exception {
super.tearDown();
for (Overseer overseer : overseers) {
overseer.close();
}
overseers.clear();
for (ZkStateReader reader : readers) {
reader.close();
}
readers.clear();
}
@Test
public void testShardAssignment() throws Exception {
@ -884,10 +902,12 @@ public class OverseerTest extends SolrTestCaseJ4 {
KeeperException, ParserConfigurationException, SAXException {
SolrZkClient zkClient = new SolrZkClient(address, TIMEOUT);
ZkStateReader reader = new ZkStateReader(zkClient);
readers.add(reader);
LeaderElector overseerElector = new LeaderElector(zkClient);
// TODO: close Overseer
Overseer overseer = new Overseer(
new HttpShardHandlerFactory().getShardHandler(), "/admin/cores", reader);
overseers.add(overseer);
ElectionContext ec = new OverseerElectionContext(zkClient, overseer, address.replaceAll("/", "_"));
overseerElector.setup(ec);
overseerElector.joinElection(ec, false);

View File

@ -0,0 +1,22 @@
package org.apache.solr.common.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
public interface BeforeReconnect {
public void command();
}

View File

@ -45,16 +45,18 @@ class ConnectionManager implements Watcher {
private final SolrZkClient client;
private final OnReconnect onReconnect;
private final BeforeReconnect beforeReconnect;
private volatile boolean isClosed = false;
public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect, BeforeReconnect beforeReconnect) {
this.name = name;
this.client = client;
this.connectionStrategy = strat;
this.zkServerAddress = zkServerAddress;
this.zkClientTimeout = zkClientTimeout;
this.onReconnect = onConnect;
this.beforeReconnect = beforeReconnect;
reset();
}
@ -84,7 +86,9 @@ class ConnectionManager implements Watcher {
} else if (state == KeeperState.Expired) {
connected = false;
log.info("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper...");
if (beforeReconnect != null) {
beforeReconnect.command();
}
try {
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
new ZkClientConnectionStrategy.ZkUpdate() {

View File

@ -85,22 +85,27 @@ public class SolrZkClient {
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, OnReconnect onReonnect) {
this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), onReonnect, zkClientConnectTimeout);
this(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, new DefaultConnectionStrategy(), onReonnect);
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
this(zkServerAddress, zkClientTimeout, strat, onReconnect, DEFAULT_CLIENT_CONNECT_TIMEOUT);
this(zkServerAddress, zkClientTimeout, DEFAULT_CLIENT_CONNECT_TIMEOUT, strat, onReconnect);
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null);
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) {
public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect) {
this.zkClientConnectionStrategy = strat;
this.zkClientTimeout = zkClientTimeout;
// we must retry at least as long as the session timeout
zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect, beforeReconnect);
try {
strat.connect(zkServerAddress, zkClientTimeout, connManager,
new ZkUpdate() {