SOLR-4199: When doing zk retries due to connectionloss, rather than just retrying for 2 minutes, retry in proportion to the session timeout.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1422391 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mark Robert Miller 2012-12-15 22:23:07 +00:00
parent a6b2552873
commit 307ebd17cb
6 changed files with 40 additions and 11 deletions

View File

@ -175,6 +175,10 @@ Optimizations
* SOLR-4063: Allow CoreContainer to load multiple SolrCores in parallel rather * SOLR-4063: Allow CoreContainer to load multiple SolrCores in parallel rather
than just serially. (Mark Miller) than just serially. (Mark Miller)
* SOLR-4199: When doing zk retries due to connectionloss, rather than just
retrying for 2 minutes, retry in proportion to the session timeout.
(Mark Miller)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -62,10 +62,11 @@ public class LeaderElector {
protected SolrZkClient zkClient; protected SolrZkClient zkClient;
private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(); private ZkCmdExecutor zkCmdExecutor;
public LeaderElector(SolrZkClient zkClient) { public LeaderElector(SolrZkClient zkClient) {
this.zkClient = zkClient; this.zkClient = zkClient;
zkCmdExecutor = new ZkCmdExecutor((int) (zkClient.getZkClientTimeout()/1000.0 + 3000));
} }
/** /**

View File

@ -99,9 +99,7 @@ public final class ZkController {
private final DistributedQueue overseerJobQueue; private final DistributedQueue overseerJobQueue;
private final DistributedQueue overseerCollectionQueue; private final DistributedQueue overseerCollectionQueue;
// package private for tests public static final String CONFIGS_ZKNODE = "/configs";
static final String CONFIGS_ZKNODE = "/configs";
public final static String COLLECTION_PARAM_PREFIX="collection."; public final static String COLLECTION_PARAM_PREFIX="collection.";
public final static String CONFIGNAME_PROP="configName"; public final static String CONFIGNAME_PROP="configName";
@ -142,6 +140,8 @@ public final class ZkController {
private int clientTimeout; private int clientTimeout;
private volatile boolean isClosed;
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort, public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
String localHostContext, String leaderVoteWait, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException, String localHostContext, String leaderVoteWait, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
TimeoutException, IOException { TimeoutException, IOException {
@ -241,7 +241,7 @@ public final class ZkController {
this.overseerJobQueue = Overseer.getInQueue(zkClient); this.overseerJobQueue = Overseer.getInQueue(zkClient);
this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient); this.overseerCollectionQueue = Overseer.getCollectionQueue(zkClient);
cmdExecutor = new ZkCmdExecutor(); cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
leaderElector = new LeaderElector(zkClient); leaderElector = new LeaderElector(zkClient);
zkStateReader = new ZkStateReader(zkClient); zkStateReader = new ZkStateReader(zkClient);
@ -266,6 +266,9 @@ public final class ZkController {
descriptor.getCloudDescriptor().isLeader = false; descriptor.getCloudDescriptor().isLeader = false;
publish(descriptor, ZkStateReader.DOWN, updateLastPublished); publish(descriptor, ZkStateReader.DOWN, updateLastPublished);
} catch (Exception e) { } catch (Exception e) {
if (isClosed) {
return;
}
try { try {
Thread.sleep(1000); Thread.sleep(1000);
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
@ -282,6 +285,9 @@ public final class ZkController {
waitForLeaderToSeeDownState(descriptor, coreZkNodeName); waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
} catch (Exception e) { } catch (Exception e) {
SolrException.log(log, "", e); SolrException.log(log, "", e);
if (isClosed) {
return;
}
try { try {
Thread.sleep(5000); Thread.sleep(5000);
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
@ -307,6 +313,7 @@ public final class ZkController {
* Closes the underlying ZooKeeper client. * Closes the underlying ZooKeeper client.
*/ */
public void close() { public void close() {
this.isClosed = true;
if (cmdDistribExecutor != null) { if (cmdDistribExecutor != null) {
try { try {

View File

@ -71,11 +71,16 @@ public class SolrZkClient {
private volatile SolrZooKeeper keeper; private volatile SolrZooKeeper keeper;
private ZkCmdExecutor zkCmdExecutor = new ZkCmdExecutor(); private ZkCmdExecutor zkCmdExecutor;
private volatile boolean isClosed = false; private volatile boolean isClosed = false;
private ZkClientConnectionStrategy zkClientConnectionStrategy; private ZkClientConnectionStrategy zkClientConnectionStrategy;
private int zkClientTimeout;
public int getZkClientTimeout() {
return zkClientTimeout;
}
public SolrZkClient(String zkServerAddress, int zkClientTimeout) { public SolrZkClient(String zkServerAddress, int zkClientTimeout) {
this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null); this(zkServerAddress, zkClientTimeout, new DefaultConnectionStrategy(), null);
} }
@ -92,6 +97,9 @@ public class SolrZkClient {
public SolrZkClient(String zkServerAddress, int zkClientTimeout, public SolrZkClient(String zkServerAddress, int zkClientTimeout,
ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) { ZkClientConnectionStrategy strat, final OnReconnect onReconnect, int clientConnectTimeout) {
this.zkClientConnectionStrategy = strat; this.zkClientConnectionStrategy = strat;
this.zkClientTimeout = zkClientTimeout;
// we must retry at least as long as the session timeout
zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
connManager = new ConnectionManager("ZooKeeperConnection Watcher:" connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect); + zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect);
try { try {

View File

@ -27,11 +27,13 @@ import org.apache.zookeeper.data.ACL;
public class ZkCmdExecutor { public class ZkCmdExecutor {
private long retryDelay = 1000L; private long retryDelay = 1300L; // 300 ms over for padding
private int retryCount = 15; private int retryCount;
private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
public ZkCmdExecutor() { public ZkCmdExecutor(int timeoutms) {
double timeouts = timeoutms / 1000.0;
this.retryCount = Math.round(0.5f * ((float)Math.sqrt(8.0f * timeouts + 1.0f) - 1.0f));
} }
public List<ACL> getAcl() { public List<ACL> getAcl() {

View File

@ -127,14 +127,16 @@ public class ZkStateReader {
private boolean closeClient = false; private boolean closeClient = false;
private ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(); private ZkCmdExecutor cmdExecutor;
public ZkStateReader(SolrZkClient zkClient) { public ZkStateReader(SolrZkClient zkClient) {
this.zkClient = zkClient; this.zkClient = zkClient;
initZkCmdExecutor(zkClient.getZkClientTimeout());
} }
public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException { public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException {
closeClient = true; closeClient = true;
initZkCmdExecutor(zkClientTimeout);
zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout, zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
// on reconnect, reload cloud info // on reconnect, reload cloud info
new OnReconnect() { new OnReconnect() {
@ -159,6 +161,11 @@ public class ZkStateReader {
}); });
} }
private void initZkCmdExecutor(int zkClientTimeout) {
// we must retry at least as long as the session timeout
cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
}
// load and publish a new CollectionInfo // load and publish a new CollectionInfo
public void updateClusterState(boolean immediate) throws KeeperException, InterruptedException { public void updateClusterState(boolean immediate) throws KeeperException, InterruptedException {
updateClusterState(immediate, false); updateClusterState(immediate, false);