mirror of https://github.com/apache/lucene.git
SOLR-5615: Deadlock while trying to recover after a ZK session expiration.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1556572 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df74150a50
commit
4f72365ba2
|
@ -347,6 +347,10 @@ Bug Fixes
|
|||
* SOLR-5608: Don't allow a closed SolrCore to publish state to ZooKeeper.
|
||||
(Mark Miller, Shawn Heisey)
|
||||
|
||||
* SOLR-5615: Deadlock while trying to recover after a ZK session expiration.
|
||||
(Ramkumar Aiyengar, Mark Miller)
|
||||
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -458,6 +458,11 @@ final class OverseerElectionContext extends ElectionContext {
|
|||
overseer.start(id);
|
||||
}
|
||||
|
||||
public void cancelElection() throws InterruptedException, KeeperException {
|
||||
super.cancelElection();
|
||||
overseer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void joinedElectionFired() {
|
||||
overseer.close();
|
||||
|
|
|
@ -64,7 +64,6 @@ public class LeaderElector {
|
|||
|
||||
private ZkCmdExecutor zkCmdExecutor;
|
||||
|
||||
// for tests
|
||||
private volatile ElectionContext context;
|
||||
|
||||
public LeaderElector(SolrZkClient zkClient) {
|
||||
|
@ -72,7 +71,6 @@ public class LeaderElector {
|
|||
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
|
||||
}
|
||||
|
||||
// for tests
|
||||
public ElectionContext getContext() {
|
||||
return context;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,28 @@ package org.apache.solr.cloud;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.URLEncoder;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrServer;
|
||||
|
@ -42,7 +64,6 @@ import org.apache.solr.core.CoreDescriptor;
|
|||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.UpdateShardHandler;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
|
@ -51,28 +72,6 @@ import org.apache.zookeeper.data.Stat;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.URLEncoder;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Handle ZooKeeper interactions.
|
||||
*
|
||||
|
@ -140,13 +139,13 @@ public final class ZkController {
|
|||
}
|
||||
private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<ContextKey, ElectionContext>());
|
||||
|
||||
private SolrZkClient zkClient;
|
||||
private ZkCmdExecutor cmdExecutor;
|
||||
private ZkStateReader zkStateReader;
|
||||
private final SolrZkClient zkClient;
|
||||
private final ZkCmdExecutor cmdExecutor;
|
||||
private final ZkStateReader zkStateReader;
|
||||
|
||||
private LeaderElector leaderElector;
|
||||
private final LeaderElector leaderElector;
|
||||
|
||||
private String zkServerAddress; // example: 127.0.0.1:54062/solr
|
||||
private final String zkServerAddress; // example: 127.0.0.1:54062/solr
|
||||
|
||||
private final String localHostPort; // example: 54065
|
||||
private final String localHostContext; // example: solr
|
||||
|
@ -227,6 +226,11 @@ public final class ZkController {
|
|||
ElectionContext context = new OverseerElectionContext(zkClient,
|
||||
overseer, getNodeName());
|
||||
|
||||
ElectionContext prevContext = overseerElector.getContext();
|
||||
if (prevContext != null) {
|
||||
prevContext.cancelElection();
|
||||
}
|
||||
|
||||
overseerElector.joinElection(context, true);
|
||||
zkStateReader.createClusterStateWatchersAndUpdate();
|
||||
|
||||
|
@ -919,6 +923,17 @@ public final class ZkController {
|
|||
|
||||
|
||||
private void joinElection(CoreDescriptor cd, boolean afterExpiration) throws InterruptedException, KeeperException, IOException {
|
||||
// look for old context - if we find it, cancel it
|
||||
String collection = cd.getCloudDescriptor().getCollectionName();
|
||||
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
|
||||
|
||||
ContextKey contextKey = new ContextKey(collection, coreNodeName);
|
||||
|
||||
ElectionContext prevContext = electionContexts.get(contextKey);
|
||||
|
||||
if (prevContext != null) {
|
||||
prevContext.cancelElection();
|
||||
}
|
||||
|
||||
String shardId = cd.getCloudDescriptor().getShardId();
|
||||
|
||||
|
@ -928,16 +943,15 @@ public final class ZkController {
|
|||
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
|
||||
props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
|
||||
|
||||
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
|
||||
|
||||
ZkNodeProps ourProps = new ZkNodeProps(props);
|
||||
String collection = cd.getCloudDescriptor()
|
||||
.getCollectionName();
|
||||
|
||||
|
||||
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
|
||||
collection, coreNodeName, ourProps, this, cc);
|
||||
|
||||
leaderElector.setup(context);
|
||||
electionContexts.put(new ContextKey(collection, coreNodeName), context);
|
||||
electionContexts.put(contextKey, context);
|
||||
leaderElector.joinElection(context, false);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,10 +34,9 @@ public class ConnectionManager implements Watcher {
|
|||
.getLogger(ConnectionManager.class);
|
||||
|
||||
private final String name;
|
||||
private CountDownLatch clientConnected;
|
||||
private KeeperState state = KeeperState.Disconnected;
|
||||
private final CountDownLatch clientConnected = new CountDownLatch(1);
|
||||
|
||||
private boolean connected = false;
|
||||
private boolean likelyExpired = true;
|
||||
|
||||
private final ZkClientConnectionStrategy connectionStrategy;
|
||||
|
||||
|
@ -48,7 +47,9 @@ public class ConnectionManager implements Watcher {
|
|||
private final OnReconnect onReconnect;
|
||||
private final BeforeReconnect beforeReconnect;
|
||||
|
||||
private volatile KeeperState state = KeeperState.Disconnected;
|
||||
private volatile boolean isClosed = false;
|
||||
private volatile boolean likelyExpired = true;
|
||||
|
||||
private volatile Timer disconnectedTimer;
|
||||
|
||||
|
@ -59,16 +60,16 @@ public class ConnectionManager implements Watcher {
|
|||
this.zkServerAddress = zkServerAddress;
|
||||
this.onReconnect = onConnect;
|
||||
this.beforeReconnect = beforeReconnect;
|
||||
clientConnected = new CountDownLatch(1);
|
||||
}
|
||||
|
||||
private synchronized void connected() {
|
||||
connected = true;
|
||||
if (disconnectedTimer != null) {
|
||||
disconnectedTimer.cancel();
|
||||
disconnectedTimer = null;
|
||||
}
|
||||
connected = true;
|
||||
likelyExpired = false;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
private synchronized void disconnected() {
|
||||
|
@ -82,18 +83,17 @@ public class ConnectionManager implements Watcher {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (ConnectionManager.this) {
|
||||
likelyExpired = true;
|
||||
}
|
||||
likelyExpired = true;
|
||||
}
|
||||
|
||||
}, (long) (client.getZkClientTimeout() * 0.90));
|
||||
}
|
||||
connected = false;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void process(WatchedEvent event) {
|
||||
public void process(WatchedEvent event) {
|
||||
if (log.isInfoEnabled()) {
|
||||
log.info("Watcher " + this + " name:" + name + " got event " + event
|
||||
+ " path:" + event.getPath() + " type:" + event.getType());
|
||||
|
@ -103,8 +103,9 @@ public class ConnectionManager implements Watcher {
|
|||
log.info("Client->ZooKeeper status change trigger but we are already closed");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
state = event.getState();
|
||||
|
||||
if (state == KeeperState.SyncConnected) {
|
||||
connected();
|
||||
clientConnected.countDown();
|
||||
|
@ -117,12 +118,16 @@ public class ConnectionManager implements Watcher {
|
|||
|
||||
connected = false;
|
||||
likelyExpired = true;
|
||||
|
||||
log.info("Our previous ZooKeeper session was expired. Attempting to reconnect to recover relationship with ZooKeeper...");
|
||||
|
||||
if (beforeReconnect != null) {
|
||||
beforeReconnect.command();
|
||||
}
|
||||
|
||||
try {
|
||||
connectionStrategy.reconnect(zkServerAddress, client.getZkClientTimeout(), this,
|
||||
connectionStrategy.reconnect(zkServerAddress,
|
||||
client.getZkClientTimeout(), this,
|
||||
new ZkClientConnectionStrategy.ZkUpdate() {
|
||||
@Override
|
||||
public void update(SolrZooKeeper keeper) {
|
||||
|
@ -146,12 +151,22 @@ public class ConnectionManager implements Watcher {
|
|||
throw new RuntimeException(t);
|
||||
}
|
||||
|
||||
if (onReconnect != null) {
|
||||
onReconnect.command();
|
||||
}
|
||||
|
||||
connected();
|
||||
|
||||
if (onReconnect != null) {
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
onReconnect.command();
|
||||
} catch (Exception e) {
|
||||
log.warn("Exception running onReconnect command", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
|
@ -165,7 +180,6 @@ public class ConnectionManager implements Watcher {
|
|||
} else {
|
||||
disconnected();
|
||||
}
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
public synchronized boolean isConnected() {
|
||||
|
@ -185,12 +199,8 @@ public class ConnectionManager implements Watcher {
|
|||
this.disconnectedTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized KeeperState state() {
|
||||
return state;
|
||||
}
|
||||
|
||||
public synchronized boolean isLikelyExpired() {
|
||||
public boolean isLikelyExpired() {
|
||||
return likelyExpired;
|
||||
}
|
||||
|
||||
|
@ -241,4 +251,4 @@ public class ConnectionManager implements Watcher {
|
|||
"", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue