mirror of https://github.com/apache/lucene.git
SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when processing concurrent requests during shutdown.
This primarily affected tests, but may have also caused odd errors/delays when restart/shutting down solr nodes.
This commit is contained in:
parent
5f34c49813
commit
980fd7d5c5
|
@ -174,6 +174,10 @@ Bug Fixes
|
|||
|
||||
* SOLR-13339: Prevent recovery, fetching index being kicked off after SolrCores already closed (Cao Manh Dat)
|
||||
|
||||
* SOLR-13393: Fixed ZkClientClusterStateProvider to prevent risk of leaking ZkStateReader/threads when
|
||||
processing concurrent requests during shutdown. This primarily affected tests, but may have also caused
|
||||
odd errors/delays when restart/shutting down solr nodes. (hossman)
|
||||
|
||||
Improvements
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -64,21 +64,17 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
@Override
|
||||
public ClusterState.CollectionRef getState(String collection) {
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
ClusterState clusterState = getZkStateReader().getClusterState();
|
||||
if (clusterState != null) {
|
||||
return clusterState.getCollectionRef(collection);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
public ZkStateReader getZkStateReader(){
|
||||
return zkStateReader;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Set<String> getLiveNodes() {
|
||||
if (isClosed) throw new AlreadyClosedException();
|
||||
ClusterState clusterState = zkStateReader.getClusterState();
|
||||
ClusterState clusterState = getZkStateReader().getClusterState();
|
||||
if (clusterState != null) {
|
||||
return clusterState.getLiveNodes();
|
||||
} else {
|
||||
|
@ -89,23 +85,23 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
@Override
|
||||
public List<String> resolveAlias(String alias) {
|
||||
return zkStateReader.getAliases().resolveAliases(alias); // if not an alias, returns itself
|
||||
return getZkStateReader().getAliases().resolveAliases(alias); // if not an alias, returns itself
|
||||
}
|
||||
|
||||
@Override
|
||||
public String resolveSimpleAlias(String alias) throws IllegalArgumentException {
|
||||
return zkStateReader.getAliases().resolveSimpleAlias(alias);
|
||||
return getZkStateReader().getAliases().resolveSimpleAlias(alias);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getClusterProperty(String propertyName) {
|
||||
Map<String, Object> props = zkStateReader.getClusterProperties();
|
||||
Map<String, Object> props = getZkStateReader().getClusterProperties();
|
||||
return props.get(propertyName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T getClusterProperty(String propertyName, T def) {
|
||||
Map<String, Object> props = zkStateReader.getClusterProperties();
|
||||
Map<String, Object> props = getZkStateReader().getClusterProperties();
|
||||
if (props.containsKey(propertyName)) {
|
||||
return (T)props.get(propertyName);
|
||||
}
|
||||
|
@ -114,12 +110,12 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
|
|||
|
||||
@Override
|
||||
public ClusterState getClusterState() throws IOException {
|
||||
return zkStateReader.getClusterState();
|
||||
return getZkStateReader().getClusterState();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getClusterProperties() {
|
||||
return zkStateReader.getClusterProperties();
|
||||
return getZkStateReader().getClusterProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -135,8 +131,7 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
|
|||
* @throws IOException if an I/O exception occurs
|
||||
*/
|
||||
public void downloadConfig(String configName, Path downloadPath) throws IOException {
|
||||
connect();
|
||||
zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath);
|
||||
getZkStateReader().getConfigManager().downloadConfigDir(configName, downloadPath);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -151,21 +146,31 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
|
|||
* @throws IOException if an IO error occurs
|
||||
*/
|
||||
public void uploadConfig(Path configPath, String configName) throws IOException {
|
||||
connect();
|
||||
zkStateReader.getConfigManager().uploadConfigDir(configPath, configName);
|
||||
getZkStateReader().getConfigManager().uploadConfigDir(configPath, configName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect() {
|
||||
// Esentially a No-Op, but force a check that we're not closed and the ZkStateReader is available...
|
||||
final ZkStateReader ignored = getZkStateReader();
|
||||
}
|
||||
|
||||
public ZkStateReader getZkStateReader() {
|
||||
if (isClosed) { // quick check...
|
||||
throw new AlreadyClosedException();
|
||||
}
|
||||
if (zkStateReader == null) {
|
||||
synchronized (this) {
|
||||
if (isClosed) { // while we were waiting for sync lock another thread may have closed
|
||||
throw new AlreadyClosedException();
|
||||
}
|
||||
if (zkStateReader == null) {
|
||||
ZkStateReader zk = null;
|
||||
try {
|
||||
zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);
|
||||
zk.createClusterStateWatchersAndUpdate();
|
||||
zkStateReader = zk;
|
||||
log.info("Cluster at {} ready", zkHost);
|
||||
zkStateReader = zk;
|
||||
} catch (InterruptedException e) {
|
||||
zk.close();
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -181,16 +186,22 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
|
|||
}
|
||||
}
|
||||
}
|
||||
return zkStateReader;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
isClosed = true;
|
||||
if (zkStateReader != null && closeZkStateReader) {
|
||||
synchronized (this) {
|
||||
if (zkStateReader != null)
|
||||
zkStateReader.close();
|
||||
synchronized (this) {
|
||||
if (false == isClosed && zkStateReader != null) {
|
||||
isClosed = true;
|
||||
|
||||
// force zkStateReader to null first so that any parallel calls drop into the synch block
|
||||
// getZkStateReader() as soon as possible.
|
||||
final ZkStateReader zkToClose = zkStateReader;
|
||||
zkStateReader = null;
|
||||
if (closeZkStateReader) {
|
||||
zkToClose.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -230,4 +241,9 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
|
|||
public String toString() {
|
||||
return zkHost;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return isClosed;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
*/
|
||||
package org.apache.solr.common.cloud;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -47,6 +46,7 @@ import java.util.stream.Collectors;
|
|||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||
import org.apache.solr.common.AlreadyClosedException;
|
||||
import org.apache.solr.common.Callable;
|
||||
import org.apache.solr.common.SolrCloseable;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.params.AutoScalingParams;
|
||||
|
@ -74,7 +74,7 @@ import static java.util.Collections.emptySortedSet;
|
|||
import static java.util.Collections.unmodifiableSet;
|
||||
import static org.apache.solr.common.util.Utils.fromJSON;
|
||||
|
||||
public class ZkStateReader implements Closeable {
|
||||
public class ZkStateReader implements SolrCloseable {
|
||||
public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
@ -851,6 +851,11 @@ public class ZkStateReader implements Closeable {
|
|||
}
|
||||
assert ObjectReleaseTracker.release(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
|
||||
public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
|
||||
ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
|
||||
|
|
|
@ -22,8 +22,10 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.MiniSolrCloudCluster;
|
||||
import org.apache.solr.common.AlreadyClosedException;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ZkConfigManager;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestCloudSolrClientConnections extends SolrTestCaseJ4 {
|
||||
|
@ -89,4 +91,56 @@ public class TestCloudSolrClientConnections extends SolrTestCaseJ4 {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAlreadyClosedClusterStateProvider() throws Exception {
|
||||
|
||||
final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(1, createTempDir(),
|
||||
buildJettyConfig("/solr"));
|
||||
// from a client perspective the behavior of ZkClientClusterStateProvider should be
|
||||
// consistent regardless of wether it's constructed with a zkhost or an existing ZkStateReader
|
||||
try {
|
||||
final ZkClientClusterStateProvider zkHost_provider
|
||||
= new ZkClientClusterStateProvider(cluster.getZkServer().getZkAddress());
|
||||
|
||||
checkAndCloseProvider(zkHost_provider);
|
||||
|
||||
final ZkStateReader reusedZkReader = new ZkStateReader(cluster.getZkClient());
|
||||
try {
|
||||
reusedZkReader.createClusterStateWatchersAndUpdate();
|
||||
final ZkClientClusterStateProvider reader_provider = new ZkClientClusterStateProvider(reusedZkReader);
|
||||
checkAndCloseProvider(reader_provider);
|
||||
|
||||
// but in the case of a reused StateZkReader,
|
||||
// closing the provider must not have closed the ZkStateReader...
|
||||
assertEquals(false, reusedZkReader.isClosed());
|
||||
|
||||
} finally {
|
||||
reusedZkReader.close();
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/** NOTE: will close the provider and assert it starts throwing AlreadyClosedException */
|
||||
private void checkAndCloseProvider(final ZkClientClusterStateProvider provider) throws Exception {
|
||||
if (random().nextBoolean()) {
|
||||
// calling connect should be purely optional and affect nothing
|
||||
provider.connect();
|
||||
}
|
||||
assertNotNull(provider.getClusterState());
|
||||
|
||||
provider.close();
|
||||
|
||||
if (random().nextBoolean()) {
|
||||
expectThrows(AlreadyClosedException.class, () -> {
|
||||
provider.connect();
|
||||
});
|
||||
}
|
||||
expectThrows(AlreadyClosedException.class, () -> {
|
||||
Object ignored = provider.getClusterState();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue