mirror of https://github.com/apache/lucene.git
try to handle reconnects that happens when things are slow
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1329650 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b3241a23b3
commit
027b6ce3bb
|
@ -18,6 +18,7 @@ package org.apache.solr.cloud;
|
|||
*/
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -28,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.OnReconnect;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
|
@ -81,43 +83,47 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
|
|||
private volatile boolean stop;
|
||||
private volatile boolean electionDone = false;
|
||||
private final ZkNodeProps props;
|
||||
private ZkStateReader zkStateReader;
|
||||
|
||||
|
||||
public ClientThread(int nodeNumber) throws Exception {
|
||||
super("Thread-" + nodeNumber);
|
||||
boolean created = false;
|
||||
this.zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
try {
|
||||
this.zkStateReader = new ZkStateReader(zkClient);
|
||||
this.nodeNumber = nodeNumber;
|
||||
props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_NAME_PROP, "");
|
||||
created = true;
|
||||
} finally {
|
||||
if (!created) {
|
||||
zkClient.close();
|
||||
|
||||
props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, Integer.toString(nodeNumber), ZkStateReader.CORE_NAME_PROP, "");
|
||||
|
||||
this.zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT, TIMEOUT, new OnReconnect() {
|
||||
|
||||
@Override
|
||||
public void command() {
|
||||
try {
|
||||
setupOnConnect();
|
||||
} catch (Throwable t) {
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
this.nodeNumber = nodeNumber;
|
||||
}
|
||||
|
||||
private void setupOnConnect() throws InterruptedException, KeeperException,
|
||||
IOException {
|
||||
ZkStateReader zkStateReader = new ZkStateReader(zkClient);
|
||||
LeaderElector elector = new LeaderElector(zkClient);
|
||||
ShardLeaderElectionContextBase context = new ShardLeaderElectionContextBase(
|
||||
elector, "shard1", "collection1", Integer.toString(nodeNumber),
|
||||
props, zkStateReader);
|
||||
elector.setup(context);
|
||||
seq = elector.joinElection(context);
|
||||
electionDone = true;
|
||||
seqToThread.put(seq, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
LeaderElector elector = new LeaderElector(ClientThread.this.zkClient);
|
||||
|
||||
ElectionContext context = new ShardLeaderElectionContextBase(elector, "shard1",
|
||||
"collection1", Integer.toString(nodeNumber), props, this.zkStateReader);
|
||||
|
||||
try {
|
||||
elector.setup(context);
|
||||
seq = elector.joinElection(context);
|
||||
electionDone = true;
|
||||
seqToThread.put(seq, this);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
//e.printStackTrace();
|
||||
}
|
||||
try {
|
||||
setupOnConnect();
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
// e.printStackTrace();
|
||||
}
|
||||
|
||||
while (!stop) {
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue