mirror of https://github.com/apache/lucene.git
SOLR-5577: Likely ZooKeeper expiration should not slow down updates a given amount, but instead cut off updates after a given time.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1553912 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
49209e1d16
commit
cd9e1129ec
|
@ -216,6 +216,10 @@ Bug Fixes
|
|||
|
||||
* SOLR-5567: ZkController getHostAddress duplicates url prefix.
|
||||
(Kyle Halliday, Alexey Serba, shalin)
|
||||
|
||||
* SOLR-5577: Likely ZooKeeper expiration should not slow down updates a given
|
||||
amount, but instead cut off updates after a given time.
|
||||
(Mark Miller, Christine Poerschke)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
|
|
@ -1271,22 +1271,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
|
|||
return;
|
||||
}
|
||||
|
||||
if (zkController.isConnected()) {
|
||||
if (!zkController.getZkClient().getConnectionManager().isLikelyExpired()) {
|
||||
return;
|
||||
}
|
||||
|
||||
long timeoutAt = System.currentTimeMillis() + zkController.getClientTimeout();
|
||||
while (System.currentTimeMillis() < timeoutAt) {
|
||||
if (zkController.isConnected()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Cannot talk to ZooKeeper - Updates are disabled.");
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,8 @@ package org.apache.solr.common.cloud;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
@ -27,7 +29,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
class ConnectionManager implements Watcher {
|
||||
public class ConnectionManager implements Watcher {
|
||||
protected static final Logger log = LoggerFactory
|
||||
.getLogger(ConnectionManager.class);
|
||||
|
||||
|
@ -35,26 +37,26 @@ class ConnectionManager implements Watcher {
|
|||
private CountDownLatch clientConnected;
|
||||
private KeeperState state;
|
||||
private boolean connected;
|
||||
private boolean likelyExpired = true;
|
||||
|
||||
private final ZkClientConnectionStrategy connectionStrategy;
|
||||
|
||||
private final String zkServerAddress;
|
||||
|
||||
private final int zkClientTimeout;
|
||||
|
||||
private final SolrZkClient client;
|
||||
|
||||
private final OnReconnect onReconnect;
|
||||
private final BeforeReconnect beforeReconnect;
|
||||
|
||||
private volatile boolean isClosed = false;
|
||||
|
||||
private volatile Timer disconnectedTimer;
|
||||
|
||||
public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect, BeforeReconnect beforeReconnect) {
|
||||
public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, ZkClientConnectionStrategy strat, OnReconnect onConnect, BeforeReconnect beforeReconnect) {
|
||||
this.name = name;
|
||||
this.client = client;
|
||||
this.connectionStrategy = strat;
|
||||
this.zkServerAddress = zkServerAddress;
|
||||
this.zkClientTimeout = zkClientTimeout;
|
||||
this.onReconnect = onConnect;
|
||||
this.beforeReconnect = beforeReconnect;
|
||||
reset();
|
||||
|
@ -63,6 +65,35 @@ class ConnectionManager implements Watcher {
|
|||
private synchronized void reset() {
|
||||
clientConnected = new CountDownLatch(1);
|
||||
state = KeeperState.Disconnected;
|
||||
disconnected();
|
||||
}
|
||||
|
||||
private synchronized void connected() {
|
||||
connected = true;
|
||||
if (disconnectedTimer != null) {
|
||||
disconnectedTimer.cancel();
|
||||
disconnectedTimer = null;
|
||||
}
|
||||
likelyExpired = false;
|
||||
}
|
||||
|
||||
private synchronized void disconnected() {
|
||||
if (disconnectedTimer != null) {
|
||||
disconnectedTimer.cancel();
|
||||
disconnectedTimer = null;
|
||||
}
|
||||
|
||||
disconnectedTimer = new Timer();
|
||||
disconnectedTimer.schedule(new TimerTask() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (ConnectionManager.this) {
|
||||
likelyExpired = true;
|
||||
}
|
||||
}
|
||||
|
||||
}, (long) (client.getZkClientTimeout() * 0.90));
|
||||
connected = false;
|
||||
}
|
||||
|
||||
|
@ -80,17 +111,17 @@ class ConnectionManager implements Watcher {
|
|||
|
||||
state = event.getState();
|
||||
if (state == KeeperState.SyncConnected) {
|
||||
connected = true;
|
||||
connected();
|
||||
clientConnected.countDown();
|
||||
connectionStrategy.connected();
|
||||
} else if (state == KeeperState.Expired) {
|
||||
connected = false;
|
||||
disconnected();
|
||||
log.info("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper...");
|
||||
if (beforeReconnect != null) {
|
||||
beforeReconnect.command();
|
||||
}
|
||||
try {
|
||||
connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
|
||||
connectionStrategy.reconnect(zkServerAddress, client.getZkClientTimeout(), this,
|
||||
new ZkClientConnectionStrategy.ZkUpdate() {
|
||||
@Override
|
||||
public void update(SolrZooKeeper keeper) {
|
||||
|
@ -118,9 +149,7 @@ class ConnectionManager implements Watcher {
|
|||
onReconnect.command();
|
||||
}
|
||||
|
||||
synchronized (ConnectionManager.this) {
|
||||
ConnectionManager.this.connected = true;
|
||||
}
|
||||
connected();
|
||||
|
||||
}
|
||||
});
|
||||
|
@ -130,10 +159,10 @@ class ConnectionManager implements Watcher {
|
|||
log.info("Connected:" + connected);
|
||||
} else if (state == KeeperState.Disconnected) {
|
||||
log.info("zkClient has disconnected");
|
||||
connected = false;
|
||||
disconnected();
|
||||
connectionStrategy.disconnected();
|
||||
} else {
|
||||
connected = false;
|
||||
disconnected();
|
||||
}
|
||||
notifyAll();
|
||||
}
|
||||
|
@ -146,11 +175,19 @@ class ConnectionManager implements Watcher {
|
|||
// to avoid deadlock on shutdown
|
||||
public void close() {
|
||||
this.isClosed = true;
|
||||
if (this.disconnectedTimer != null) {
|
||||
this.disconnectedTimer.cancel();
|
||||
this.disconnectedTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized KeeperState state() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public synchronized boolean isLikelyExpired() {
|
||||
return likelyExpired;
|
||||
}
|
||||
|
||||
public synchronized void waitForConnected(long waitForConnection)
|
||||
throws TimeoutException {
|
||||
|
|
|
@ -105,7 +105,7 @@ public class SolrZkClient {
|
|||
// we must retry at least as long as the session timeout
|
||||
zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
|
||||
connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
|
||||
+ zkServerAddress, this, zkServerAddress, zkClientTimeout, strat, onReconnect, beforeReconnect);
|
||||
+ zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect);
|
||||
try {
|
||||
strat.connect(zkServerAddress, zkClientTimeout, connManager,
|
||||
new ZkUpdate() {
|
||||
|
@ -149,6 +149,10 @@ public class SolrZkClient {
|
|||
numOpens.incrementAndGet();
|
||||
}
|
||||
|
||||
public ConnectionManager getConnectionManager() {
|
||||
return connManager;
|
||||
}
|
||||
|
||||
public ZkClientConnectionStrategy getZkClientConnectionStrategy() {
|
||||
return zkClientConnectionStrategy;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue