mirror of https://github.com/apache/lucene.git
SOLR-8599: After a failed connection during construction of SolrZkClient attempt to retry until a connection can be made
This commit is contained in:
parent
eeee1c3f40
commit
2c0a5e3036
|
@ -198,6 +198,9 @@ Bug Fixes
|
|||
* SOLR-8697: Scope ZK election nodes by session to prevent elections from interfering with each other
|
||||
and other small LeaderElector improvements. (Scott Blum via Mark Miller)
|
||||
|
||||
* SOLR-8599: After a failed connection during construction of SolrZkClient attempt to retry until a connection
|
||||
can be made. (Keith Laban, Dennis Gove)
|
||||
|
||||
Optimizations
|
||||
----------------------
|
||||
* SOLR-7876: Speed up queries and operations that use many terms when timeAllowed has not been
|
||||
|
|
|
@ -16,14 +16,20 @@
|
|||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.common.cloud.ConnectionManager;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher.Event.EventType;
|
||||
import org.apache.zookeeper.Watcher.Event.KeeperState;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@Slow
|
||||
public class ConnectionManagerTest extends SolrTestCaseJ4 {
|
||||
|
@ -108,4 +114,44 @@ public class ConnectionManagerTest extends SolrTestCaseJ4 {
|
|||
server.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReconnectWhenZkDisappeared() throws Exception {
|
||||
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultSolrThreadFactory("connectionManagerTest"));
|
||||
|
||||
// setup a SolrZkClient to do some getBaseUrlForNodeName testing
|
||||
String zkDir = createTempDir("zkData").toFile().getAbsolutePath();
|
||||
|
||||
ZkTestServer server = new ZkTestServer(zkDir);
|
||||
try {
|
||||
server.run();
|
||||
|
||||
AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
|
||||
AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
|
||||
|
||||
SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), TIMEOUT);
|
||||
ConnectionManager cm = zkClient.getConnectionManager();
|
||||
try {
|
||||
assertFalse(cm.isLikelyExpired());
|
||||
assertTrue(cm.isConnected());
|
||||
|
||||
|
||||
cm.setZkServerAddress("http://BADADDRESS");
|
||||
executor.schedule(() -> {
|
||||
cm.setZkServerAddress(server.getZkAddress());
|
||||
}, 5, TimeUnit.SECONDS);
|
||||
|
||||
// reconnect -- should no longer be likely expired
|
||||
cm.process(new WatchedEvent(EventType.None, KeeperState.Expired, ""));
|
||||
assertFalse(cm.isLikelyExpired());
|
||||
assertTrue(cm.isConnected());
|
||||
} finally {
|
||||
cm.close();
|
||||
zkClient.close();
|
||||
executor.shutdown();
|
||||
}
|
||||
} finally {
|
||||
server.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,7 +37,10 @@ public class ConnectionManager implements Watcher {
|
|||
|
||||
private final ZkClientConnectionStrategy connectionStrategy;
|
||||
|
||||
private final String zkServerAddress;
|
||||
//expert: mutable for testing
|
||||
private String zkServerAddress;
|
||||
|
||||
|
||||
|
||||
private final SolrZkClient client;
|
||||
|
||||
|
@ -128,40 +131,50 @@ public class ConnectionManager implements Watcher {
|
|||
}
|
||||
}
|
||||
|
||||
try {
|
||||
connectionStrategy.reconnect(zkServerAddress,
|
||||
client.getZkClientTimeout(), this,
|
||||
new ZkClientConnectionStrategy.ZkUpdate() {
|
||||
@Override
|
||||
public void update(SolrZooKeeper keeper) {
|
||||
try {
|
||||
waitForConnected(Long.MAX_VALUE);
|
||||
} catch (Exception e1) {
|
||||
closeKeeper(keeper);
|
||||
throw new RuntimeException(e1);
|
||||
do {
|
||||
// This loop will break iff a valid connection is made. If a connection is not made then it will repeat and
|
||||
// try again to create a new connection.
|
||||
try {
|
||||
connectionStrategy.reconnect(zkServerAddress,
|
||||
client.getZkClientTimeout(), this,
|
||||
new ZkClientConnectionStrategy.ZkUpdate() {
|
||||
@Override
|
||||
public void update(SolrZooKeeper keeper) {
|
||||
try {
|
||||
waitForConnected(Long.MAX_VALUE);
|
||||
} catch (Exception e1) {
|
||||
closeKeeper(keeper);
|
||||
throw new RuntimeException(e1);
|
||||
}
|
||||
|
||||
log.info("Connection with ZooKeeper reestablished.");
|
||||
try {
|
||||
client.updateKeeper(keeper);
|
||||
} catch (InterruptedException e) {
|
||||
closeKeeper(keeper);
|
||||
Thread.currentThread().interrupt();
|
||||
// we must have been asked to stop
|
||||
throw new RuntimeException(e);
|
||||
} catch (Exception t) {
|
||||
closeKeeper(keeper);
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
|
||||
if (onReconnect != null) {
|
||||
onReconnect.command();
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Connection with ZooKeeper reestablished.");
|
||||
try {
|
||||
client.updateKeeper(keeper);
|
||||
} catch (InterruptedException e) {
|
||||
closeKeeper(keeper);
|
||||
Thread.currentThread().interrupt();
|
||||
// we must have been asked to stop
|
||||
throw new RuntimeException(e);
|
||||
} catch (Exception t) {
|
||||
closeKeeper(keeper);
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
|
||||
if (onReconnect != null) {
|
||||
onReconnect.command();
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "", e);
|
||||
}
|
||||
});
|
||||
|
||||
break;
|
||||
|
||||
} catch (Exception e) {
|
||||
SolrException.log(log, "", e);
|
||||
log.info("Could not connect due to error, sleeping for 5s and trying agian");
|
||||
waitSleep(1000);
|
||||
}
|
||||
|
||||
} while (!isClosed);
|
||||
log.info("Connected:" + connected);
|
||||
} else if (state == KeeperState.Disconnected) {
|
||||
log.info("zkClient has disconnected");
|
||||
|
@ -186,6 +199,14 @@ public class ConnectionManager implements Watcher {
|
|||
public boolean isLikelyExpired() {
|
||||
return isClosed || likelyExpiredState.isLikelyExpired((long) (client.getZkClientTimeout() * 0.90));
|
||||
}
|
||||
|
||||
public synchronized void waitSleep(long waitFor) {
|
||||
try {
|
||||
wait(waitFor);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void waitForConnected(long waitForConnection)
|
||||
throws TimeoutException {
|
||||
|
@ -234,4 +255,9 @@ public class ConnectionManager implements Watcher {
|
|||
"", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//expert: mutable for testing
|
||||
public void setZkServerAddress(String zkServerAddress) {
|
||||
this.zkServerAddress = zkServerAddress;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue