reformatting

This commit is contained in:
Noble Paul 2019-06-25 17:02:17 +10:00
parent 4589bbe47b
commit 742c80550c
8 changed files with 716 additions and 639 deletions

View File

@ -60,7 +60,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
* The purpose of this class is to store the Jars loaded in memory and to keep only one copy of the Jar in a single node.
*/
public class BlobRepository {
private static final long MAX_JAR_SIZE = Long.parseLong(System.getProperty("runtme.lib.size", String.valueOf(5* 1024*1024)));
private static final long MAX_JAR_SIZE = Long.parseLong(System.getProperty("runtme.lib.size", String.valueOf(5 * 1024 * 1024)));
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
static final Random RANDOM;
static final Pattern BLOB_KEY_PATTERN_CHECKER = Pattern.compile(".*/\\d+");
@ -89,6 +89,7 @@ public class BlobRepository {
}
// I wanted to {@link SolrCore#loadDecodeAndCacheBlob(String, Decoder)} below but precommit complains
/**
* Returns the contents of a blob containing a ByteBuffer and increments a reference count. Please return the
* same object to decrease the refcount. This is normally used for storing jar files, and binary raw data.
@ -98,7 +99,7 @@ public class BlobRepository {
* @return The reference of a blob
*/
public BlobContentRef<ByteBuffer> getBlobIncRef(String key) {
return getBlobIncRef(key, () -> addBlob(key));
return getBlobIncRef(key, () -> addBlob(key));
}
/**
@ -107,18 +108,18 @@ public class BlobRepository {
* authors attempting to share objects across cores should use
* {@code SolrCore#loadDecodeAndCacheBlob(String, Decoder)} which ensures that a proper close hook is also created.
*
* @param key it is a combination of blob name and version like blobName/version
* @param key it is a combination of blob name and version like blobName/version
* @param decoder a decoder that knows how to interpret the bytes from the blob
* @return The reference of a blob
*/
BlobContentRef<Object> getBlobIncRef(String key, Decoder<Object> decoder) {
return getBlobIncRef(key.concat(decoder.getName()), () -> addBlob(key,decoder));
return getBlobIncRef(key.concat(decoder.getName()), () -> addBlob(key, decoder));
}
BlobContentRef getBlobIncRef(String key, Decoder decoder, String url, String sha512) {
StringBuffer keyBuilder = new StringBuffer(key);
if (decoder != null) keyBuilder .append( decoder.getName());
keyBuilder.append("/").append( sha512);
if (decoder != null) keyBuilder.append(decoder.getName());
keyBuilder.append("/").append(sha512);
return getBlobIncRef(keyBuilder.toString(), () -> new BlobContent<>(key, fetchBlobAndVerify(key, url, sha512), decoder));
}
@ -133,7 +134,7 @@ public class BlobRepository {
try {
aBlob = blobCreator.call();
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading failed: "+e.getMessage(), e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Blob loading failed: " + e.getMessage(), e);
}
}
}
@ -151,7 +152,7 @@ public class BlobRepository {
// For use cases sharing raw bytes
private BlobContent<ByteBuffer> addBlob(String key) {
ByteBuffer b = fetchBlob(key);
BlobContent<ByteBuffer> aBlob = new BlobContent<>(key, b);
BlobContent<ByteBuffer> aBlob = new BlobContent<>(key, b);
blobs.put(key, aBlob);
return aBlob;
}
@ -159,11 +160,12 @@ public class BlobRepository {
// for use cases sharing java objects
private BlobContent<Object> addBlob(String key, Decoder<Object> decoder) {
ByteBuffer b = fetchBlob(key);
String keyPlusName = key + decoder.getName();
String keyPlusName = key + decoder.getName();
BlobContent<Object> aBlob = new BlobContent<>(keyPlusName, b, decoder);
blobs.put(keyPlusName, aBlob);
return aBlob;
}
static String INVALID_JAR_MSG = "Invalid jar from {0} , expected sha512 hash : {1} , actual : {2}";
private ByteBuffer fetchBlobAndVerify(String key, String url, String sha512) {
@ -193,7 +195,7 @@ public class BlobRepository {
/**
* Package local for unit tests only please do not use elsewhere
* Package local for unit tests only please do not use elsewhere
*/
ByteBuffer fetchBlob(String key) {
Replica replica = getSystemCollReplica();
@ -234,9 +236,11 @@ public class BlobRepository {
ZkStateReader zkStateReader = this.coreContainer.getZkController().getZkStateReader();
ClusterState cs = zkStateReader.getClusterState();
DocCollection coll = cs.getCollectionOrNull(CollectionAdminParams.SYSTEM_COLL);
if (coll == null) throw new SolrException(SERVICE_UNAVAILABLE, CollectionAdminParams.SYSTEM_COLL + " collection not available");
if (coll == null)
throw new SolrException(SERVICE_UNAVAILABLE, CollectionAdminParams.SYSTEM_COLL + " collection not available");
ArrayList<Slice> slices = new ArrayList<>(coll.getActiveSlices());
if (slices.isEmpty()) throw new SolrException(SERVICE_UNAVAILABLE, "No active slices for " + CollectionAdminParams.SYSTEM_COLL + " collection");
if (slices.isEmpty())
throw new SolrException(SERVICE_UNAVAILABLE, "No active slices for " + CollectionAdminParams.SYSTEM_COLL + " collection");
Collections.shuffle(slices, RANDOM); //do load balancing
Replica replica = null;
@ -245,7 +249,7 @@ public class BlobRepository {
Collections.shuffle(replicas, RANDOM);
for (Replica r : replicas) {
if (r.getState() == Replica.State.ACTIVE) {
if(zkStateReader.getClusterState().getLiveNodes().contains(r.get(ZkStateReader.NODE_NAME_PROP))){
if (zkStateReader.getClusterState().getLiveNodes().contains(r.get(ZkStateReader.NODE_NAME_PROP))) {
replica = r;
break;
} else {
@ -313,7 +317,9 @@ public class BlobRepository {
*
* @return The name of the decoding, defaults to empty string.
*/
default String getName() { return ""; }
default String getName() {
return "";
}
/**
* A routine that knows how to convert the stream of bytes from the blob into a Java object.

View File

@ -44,9 +44,8 @@ import org.slf4j.LoggerFactory;
* A {@link DirectoryFactory} impl base class for caching Directory instances
* per path. Most DirectoryFactory implementations will want to extend this
* class and simply implement {@link DirectoryFactory#create(String, LockFactory, DirContext)}.
*
* <p>
* This is an expert class and these API's are subject to change.
*
*/
public abstract class CachingDirectoryFactory extends DirectoryFactory {
protected static class CacheValue {
@ -64,6 +63,7 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
// for debug
// this.originTrace = new RuntimeException("Originated from:");
}
public int refCnt = 1;
// has doneWithDirectory(Directory) been called on this?
public boolean closeCacheValueCalled = false;
@ -88,11 +88,11 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected Map<String,CacheValue> byPathCache = new HashMap<>();
protected Map<String, CacheValue> byPathCache = new HashMap<>();
protected Map<Directory,CacheValue> byDirectoryCache = new IdentityHashMap<>();
protected Map<Directory, CacheValue> byDirectoryCache = new IdentityHashMap<>();
protected Map<Directory,List<CloseListener>> closeListeners = new HashMap<>();
protected Map<Directory, List<CloseListener>> closeListeners = new HashMap<>();
protected Set<CacheValue> removeEntries = new HashSet<>();
@ -162,12 +162,12 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
Collection<CacheValue> values = byDirectoryCache.values();
for (CacheValue val : values) {
log.debug("Closing {} - currently tracking: {}",
this.getClass().getSimpleName(), val);
this.getClass().getSimpleName(), val);
try {
// if there are still refs out, we have to wait for them
assert val.refCnt > -1 : val.refCnt;
int cnt = 0;
while(val.refCnt != 0) {
while (val.refCnt != 0) {
wait(100);
if (cnt++ >= 120) {
@ -329,7 +329,7 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
* java.lang.String, boolean)
*/
@Override
public final Directory get(String path, DirContext dirContext, String rawLockType)
public final Directory get(String path, DirContext dirContext, String rawLockType)
throws IOException {
String fullPath = normalize(path);
synchronized (this) {
@ -475,7 +475,7 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
}
protected synchronized void removeDirectory(CacheValue cacheValue) throws IOException {
// this page intentionally left blank
// this page intentionally left blank
}
@Override
@ -493,8 +493,8 @@ public abstract class CachingDirectoryFactory extends DirectoryFactory {
/**
* Method for inspecting the cache
* @return paths in the cache which have not been marked "done"
*
* @return paths in the cache which have not been marked "done"
* @see #doneWithDirectory
*/
public synchronized Set<String> getLivePaths() {

View File

@ -208,8 +208,8 @@ public class CloudConfig {
public CloudConfig build() {
return new CloudConfig(zkHost, zkClientTimeout, hostPort, hostName, hostContext, useGenericCoreNames, leaderVoteWait,
leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
createCollectionCheckLeaderActive);
leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
createCollectionCheckLeaderActive);
}
}
}

View File

@ -42,7 +42,7 @@ public class ConfigSetProperties {
* Return the properties associated with the ConfigSet (e.g. immutable)
*
* @param loader the resource loader
* @param name the name of the config set properties file
* @param name the name of the config set properties file
* @return the properties in a NamedList
*/
public static NamedList readFromResourceLoader(SolrResourceLoader loader, String name) {
@ -70,7 +70,7 @@ public class ConfigSetProperties {
final String objectClass = object == null ? "null" : object.getClass().getName();
throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid JSON type " + objectClass + ", expected Map");
}
return new NamedList((Map)object);
return new NamedList((Map) object);
} catch (Exception ex) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to load ConfigSet properties", ex);
} finally {

View File

@ -134,7 +134,6 @@ import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
/**
*
* @since solr 1.3
*/
public class CoreContainer {
@ -175,7 +174,7 @@ public class CoreContainer {
private volatile UpdateShardHandler updateShardHandler;
private volatile ExecutorService coreContainerWorkExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(
new DefaultSolrThreadFactory("coreContainerWorkExecutor") );
new DefaultSolrThreadFactory("coreContainerWorkExecutor"));
private final OrderedExecutor replayUpdatesExecutor;
@ -230,7 +229,7 @@ public class CoreContainer {
private ExecutorService coreContainerAsyncTaskExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("Core Container Async Task");
private enum CoreInitFailedAction { fromleader, none }
private enum CoreInitFailedAction {fromleader, none}
/**
* This method instantiates a new instance of {@linkplain BackupRepository}.
@ -268,6 +267,7 @@ public class CoreContainer {
/**
* Create a new CoreContainer using system properties to detect the solr home
* directory. The container's cores are not loaded.
*
* @see #load()
*/
public CoreContainer() {
@ -277,6 +277,7 @@ public class CoreContainer {
/**
* Create a new CoreContainer using the given SolrResourceLoader. The container's
* cores are not loaded.
*
* @param loader the SolrResourceLoader
* @see #load()
*/
@ -287,6 +288,7 @@ public class CoreContainer {
/**
* Create a new CoreContainer using the given solr home directory. The container's
* cores are not loaded.
*
* @param solrHome a String containing the path to the solr home directory
* @see #load()
*/
@ -298,6 +300,7 @@ public class CoreContainer {
* Create a new CoreContainer using the given SolrResourceLoader,
* configuration and CoresLocator. The container's cores are
* not loaded.
*
* @param config a ConfigSolr representation of this container's configuration
* @see #load()
*/
@ -416,7 +419,7 @@ public class CoreContainer {
}
if (pluginClassName != null) {
log.debug("Authentication plugin class obtained from security.json: "+pluginClassName);
log.debug("Authentication plugin class obtained from security.json: " + pluginClassName);
} else if (System.getProperty(AUTHENTICATION_PLUGIN_PROP) != null) {
pluginClassName = System.getProperty(AUTHENTICATION_PLUGIN_PROP);
log.debug("Authentication plugin class obtained from system property '" +
@ -529,7 +532,8 @@ public class CoreContainer {
/**
* Create a new CoreContainer and load its cores
* @param solrHome the solr home directory
*
* @param solrHome the solr home directory
* @param configFile the file containing this container's configuration
* @return a loaded CoreContainer
*/
@ -576,7 +580,7 @@ public class CoreContainer {
/**
* Load the cores defined for this CoreContainer
*/
public void load() {
public void load() {
log.debug("Loading cores into CoreContainer [instanceDir={}]", loader.getInstancePath());
// add the sharedLib to the shared resource loader before initializing cfg based plugins
@ -617,7 +621,7 @@ public class CoreContainer {
hostName = cfg.getNodeName();
zkSys.initZooKeeper(this, solrHome, cfg.getCloudConfig());
if(isZooKeeperAware()) {
if (isZooKeeperAware()) {
pkiAuthenticationPlugin = new PKIAuthenticationPlugin(this, zkSys.getZkController().getNodeName(),
(PublicKeyHandler) containerHandlers.get(PublicKeyHandler.PATH));
TracerConfigurator.loadTracer(loader, cfg.getTracerConfiguratorPluginInfo(), getZkController().getZkStateReader());
@ -632,8 +636,8 @@ public class CoreContainer {
createHandler(ZK_PATH, ZookeeperInfoHandler.class.getName(), ZookeeperInfoHandler.class);
createHandler(ZK_STATUS_PATH, ZookeeperStatusHandler.class.getName(), ZookeeperStatusHandler.class);
collectionsHandler = createHandler(COLLECTIONS_HANDLER_PATH, cfg.getCollectionsHandlerClass(), CollectionsHandler.class);
infoHandler = createHandler(INFO_HANDLER_PATH, cfg.getInfoHandlerClass(), InfoHandler.class);
coreAdminHandler = createHandler(CORES_HANDLER_PATH, cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
infoHandler = createHandler(INFO_HANDLER_PATH, cfg.getInfoHandlerClass(), InfoHandler.class);
coreAdminHandler = createHandler(CORES_HANDLER_PATH, cfg.getCoreAdminHandlerClass(), CoreAdminHandler.class);
configSetsHandler = createHandler(CONFIGSETS_HANDLER_PATH, cfg.getConfigSetsHandlerClass(), ConfigSetsHandler.class);
// metricsHistoryHandler uses metricsHandler, so create it first
@ -667,18 +671,18 @@ public class CoreContainer {
String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
String metricTag = Integer.toHexString(hashCode());
metricManager.registerGauge(null, registryName, () -> solrCores.getCores().size(),
metricTag,true, "loaded", SolrInfoBean.Category.CONTAINER.toString(), "cores");
metricTag, true, "loaded", SolrInfoBean.Category.CONTAINER.toString(), "cores");
metricManager.registerGauge(null, registryName, () -> solrCores.getLoadedCoreNames().size() - solrCores.getCores().size(),
metricTag,true, "lazy", SolrInfoBean.Category.CONTAINER.toString(), "cores");
metricTag, true, "lazy", SolrInfoBean.Category.CONTAINER.toString(), "cores");
metricManager.registerGauge(null, registryName, () -> solrCores.getAllCoreNames().size() - solrCores.getLoadedCoreNames().size(),
metricTag,true, "unloaded", SolrInfoBean.Category.CONTAINER.toString(), "cores");
metricTag, true, "unloaded", SolrInfoBean.Category.CONTAINER.toString(), "cores");
Path dataHome = cfg.getSolrDataHome() != null ? cfg.getSolrDataHome() : cfg.getCoreRootDirectory();
metricManager.registerGauge(null, registryName, () -> dataHome.toFile().getTotalSpace(),
metricTag,true, "totalSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs");
metricTag, true, "totalSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs");
metricManager.registerGauge(null, registryName, () -> dataHome.toFile().getUsableSpace(),
metricTag,true, "usableSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs");
metricTag, true, "usableSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs");
metricManager.registerGauge(null, registryName, () -> dataHome.toAbsolutePath().toString(),
metricTag,true, "path", SolrInfoBean.Category.CONTAINER.toString(), "fs");
metricTag, true, "path", SolrInfoBean.Category.CONTAINER.toString(), "fs");
metricManager.registerGauge(null, registryName, () -> {
try {
return org.apache.lucene.util.IOUtils.spins(dataHome.toAbsolutePath());
@ -687,13 +691,13 @@ public class CoreContainer {
return true;
}
},
metricTag,true, "spins", SolrInfoBean.Category.CONTAINER.toString(), "fs");
metricTag, true, "spins", SolrInfoBean.Category.CONTAINER.toString(), "fs");
metricManager.registerGauge(null, registryName, () -> cfg.getCoreRootDirectory().toFile().getTotalSpace(),
metricTag,true, "totalSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot");
metricTag, true, "totalSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot");
metricManager.registerGauge(null, registryName, () -> cfg.getCoreRootDirectory().toFile().getUsableSpace(),
metricTag,true, "usableSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot");
metricTag, true, "usableSpace", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot");
metricManager.registerGauge(null, registryName, () -> cfg.getCoreRootDirectory().toAbsolutePath().toString(),
metricTag,true, "path", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot");
metricTag, true, "path", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot");
metricManager.registerGauge(null, registryName, () -> {
try {
return org.apache.lucene.util.IOUtils.spins(cfg.getCoreRootDirectory().toAbsolutePath());
@ -702,12 +706,12 @@ public class CoreContainer {
return true;
}
},
metricTag,true, "spins", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot");
metricTag, true, "spins", SolrInfoBean.Category.CONTAINER.toString(), "fs", "coreRoot");
// add version information
metricManager.registerGauge(null, registryName, () -> this.getClass().getPackage().getSpecificationVersion(),
metricTag,true, "specification", SolrInfoBean.Category.CONTAINER.toString(), "version");
metricTag, true, "specification", SolrInfoBean.Category.CONTAINER.toString(), "version");
metricManager.registerGauge(null, registryName, () -> this.getClass().getPackage().getImplementationVersion(),
metricTag,true, "implementation", SolrInfoBean.Category.CONTAINER.toString(), "version");
metricTag, true, "implementation", SolrInfoBean.Category.CONTAINER.toString(), "version");
SolrFieldCacheBean fieldCacheBean = new SolrFieldCacheBean();
fieldCacheBean.initializeMetrics(metricManager, registryName, metricTag, null);
@ -1054,8 +1058,8 @@ public class CoreContainer {
}
protected SolrCore registerCore(CoreDescriptor cd, SolrCore core, boolean registerInZk, boolean skipRecovery) {
if( core == null ) {
throw new RuntimeException( "Can not register a null core." );
if (core == null) {
throw new RuntimeException("Can not register a null core.");
}
if (isShutDown) {
@ -1063,24 +1067,23 @@ public class CoreContainer {
throw new IllegalStateException("This CoreContainer has been closed");
}
SolrCore old = solrCores.putCore(cd, core);
/*
* set both the name of the descriptor and the name of the
* core, since the descriptors name is used for persisting.
*/
/*
* set both the name of the descriptor and the name of the
* core, since the descriptors name is used for persisting.
*/
core.setName(cd.getName());
coreInitFailures.remove(cd.getName());
if( old == null || old == core) {
log.debug( "registering core: " + cd.getName() );
if (old == null || old == core) {
log.debug("registering core: " + cd.getName());
if (registerInZk) {
zkSys.registerInZk(core, false, skipRecovery);
}
return null;
}
else {
log.debug( "replacing core: " + cd.getName() );
} else {
log.debug("replacing core: " + cd.getName());
old.close();
if (registerInZk) {
zkSys.registerInZk(core, false, skipRecovery);
@ -1091,7 +1094,8 @@ public class CoreContainer {
/**
* Creates a new core, publishing the core state to the cluster
* @param coreName the core name
*
* @param coreName the core name
* @param parameters the core parameters
* @return the newly created core
*/
@ -1101,9 +1105,10 @@ public class CoreContainer {
/**
* Creates a new core in a specified instance directory, publishing the core state to the cluster
* @param coreName the core name
*
* @param coreName the core name
* @param instancePath the instance directory
* @param parameters the core parameters
* @param parameters the core parameters
* @return the newly created core
*/
public SolrCore create(String coreName, Path instancePath, Map<String, String> parameters, boolean newCollection) {
@ -1185,27 +1190,26 @@ public class CoreContainer {
*
* @param dcore a core descriptor
* @param publishState publish core state to the cluster if true
* <p>
* WARNING: Any call to this method should be surrounded by a try/finally block
* that calls solrCores.waitAddPendingCoreOps(...) and solrCores.removeFromPendingOps(...)
*
* WARNING: Any call to this method should be surrounded by a try/finally block
* that calls solrCores.waitAddPendingCoreOps(...) and solrCores.removeFromPendingOps(...)
*
* <pre>
* <code>
* try {
* solrCores.waitAddPendingCoreOps(dcore.getName());
* createFromDescriptor(...);
* } finally {
* solrCores.removeFromPendingOps(dcore.getName());
* }
* </code>
* </pre>
*
* Trying to put the waitAddPending... in this method results in Bad Things Happening due to race conditions.
* getCore() depends on getting the core returned _if_ it's in the pending list due to some other thread opening it.
* If the core is not in the pending list and not loaded, then getCore() calls this method. Anything that called
* to check if the core was loaded _or_ in pending ops and, based on the return called createFromDescriptor would
* introduce a race condition, see getCore() for the place it would be a problem
*
* <pre>
* <code>
* try {
* solrCores.waitAddPendingCoreOps(dcore.getName());
* createFromDescriptor(...);
* } finally {
* solrCores.removeFromPendingOps(dcore.getName());
* }
* </code>
* </pre>
* <p>
* Trying to put the waitAddPending... in this method results in Bad Things Happening due to race conditions.
* getCore() depends on getting the core returned _if_ it's in the pending list due to some other thread opening it.
* If the core is not in the pending list and not loaded, then getCore() calls this method. Anything that called
* to check if the core was loaded _or_ in pending ops and, based on the return called createFromDescriptor would
* introduce a race condition, see getCore() for the place it would be a problem
* @return the newly created core
*/
@SuppressWarnings("resource")
@ -1249,14 +1253,14 @@ public class CoreContainer {
}
solrCores.removeCoreDescriptor(dcore);
final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
if(core != null && !core.isClosed())
if (core != null && !core.isClosed())
IOUtils.closeQuietly(core);
throw solrException;
} catch (Throwable t) {
SolrException e = new SolrException(ErrorCode.SERVER_ERROR, "JVM Error creating core [" + dcore.getName() + "]: " + t.getMessage(), t);
coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
solrCores.removeCoreDescriptor(dcore);
if(core != null && !core.isClosed())
if (core != null && !core.isClosed())
IOUtils.closeQuietly(core);
throw t;
} finally {
@ -1283,18 +1287,13 @@ public class CoreContainer {
* Take action when we failed to create a SolrCore. If error is due to corrupt index, try to recover. Various recovery
* strategies can be specified via system properties "-DCoreInitFailedAction={fromleader, none}"
*
* @see CoreInitFailedAction
*
* @param original
* the problem seen when loading the core the first time.
* @param dcore
* core descriptor for the core to create
* @param coreConfig
* core config for the core to create
* @param original the problem seen when loading the core the first time.
* @param dcore core descriptor for the core to create
* @param coreConfig core config for the core to create
* @return if possible
* @throws SolrException
* rethrows the original exception if we will not attempt to recover, throws a new SolrException with the
* original exception as a suppressed exception if there is a second problem creating the solr core.
* @throws SolrException rethrows the original exception if we will not attempt to recover, throws a new SolrException with the
* original exception as a suppressed exception if there is a second problem creating the solr core.
* @see CoreInitFailedAction
*/
private SolrCore processCoreCreateException(SolrException original, CoreDescriptor dcore, ConfigSet coreConfig) {
// Traverse full chain since CIE may not be root exception
@ -1382,16 +1381,16 @@ public class CoreContainer {
* Gets the cores that are currently loaded, i.e. cores that have
* 1: loadOnStartup=true and are either not-transient or, if transient, have been loaded and have not been aged out
* 2: loadOnStartup=false and have been loaded but are either non-transient or have not been aged out.
*
* <p>
* Put another way, this will not return any names of cores that are lazily loaded but have not been called for yet
* or are transient and either not loaded or have been swapped out.
*
*/
public Collection<String> getLoadedCoreNames() {
return solrCores.getLoadedCoreNames();
}
/** This method is currently experimental.
/**
* This method is currently experimental.
*
* @return a Collection of the names that a specific core object is mapped to, there are more than one.
*/
@ -1401,8 +1400,8 @@ public class CoreContainer {
/**
* get a list of all the cores that are currently known, whether currently loaded or not
* @return a list of all the available core names in either permanent or transient cores
*
* @return a list of all the available core names in either permanent or transient cores
*/
public Collection<String> getAllCoreNames() {
return solrCores.getAllCoreNames();
@ -1420,11 +1419,11 @@ public class CoreContainer {
* can be changed as various SolrCore operations are performed:
* </p>
* <ul>
* <li>Failed attempts to create new SolrCores will add new Exceptions.</li>
* <li>Failed attempts to re-create a SolrCore using a name already contained in this Map will replace the Exception.</li>
* <li>Failed attempts to reload a SolrCore will cause an Exception to be added to this list -- even though the existing SolrCore with that name will continue to be available.</li>
* <li>Successful attempts to re-created a SolrCore using a name already contained in this Map will remove the Exception.</li>
* <li>Registering an existing SolrCore with a name already contained in this Map (ie: ALIAS or SWAP) will remove the Exception.</li>
* <li>Failed attempts to create new SolrCores will add new Exceptions.</li>
* <li>Failed attempts to re-create a SolrCore using a name already contained in this Map will replace the Exception.</li>
* <li>Failed attempts to reload a SolrCore will cause an Exception to be added to this list -- even though the existing SolrCore with that name will continue to be available.</li>
* <li>Successful attempts to re-created a SolrCore using a name already contained in this Map will remove the Exception.</li>
* <li>Registering an existing SolrCore with a name already contained in this Map (ie: ALIAS or SWAP) will remove the Exception.</li>
* </ul>
*/
public Map<String, CoreLoadFailure> getCoreInitFailures() {
@ -1564,8 +1563,8 @@ public class CoreContainer {
* Swaps two SolrCore descriptors.
*/
public void swap(String n0, String n1) {
if( n0 == null || n1 == null ) {
throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Can not swap unnamed cores." );
if (n0 == null || n1 == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can not swap unnamed cores.");
}
solrCores.swap(n0, n1);
@ -1576,6 +1575,7 @@ public class CoreContainer {
/**
* Unload a core from this container, leaving all files on disk
*
* @param name the name of the core to unload
*/
public void unload(String name) {
@ -1585,9 +1585,9 @@ public class CoreContainer {
/**
* Unload a core from this container, optionally removing the core's data and configuration
*
* @param name the name of the core to unload
* @param deleteIndexDir if true, delete the core's index on close
* @param deleteDataDir if true, delete the core's data directory on close
* @param name the name of the core to unload
* @param deleteIndexDir if true, delete the core's index on close
* @param deleteDataDir if true, delete the core's data directory on close
* @param deleteInstanceDir if true, delete the core's instance directory on close
*/
public void unload(String name, boolean deleteIndexDir, boolean deleteDataDir, boolean deleteInstanceDir) {
@ -1680,6 +1680,7 @@ public class CoreContainer {
/**
* Get the CoreDescriptors for all cores managed by this container
*
* @return a List of CoreDescriptors
*/
public List<CoreDescriptor> getCoreDescriptors() {
@ -1697,10 +1698,10 @@ public class CoreContainer {
/**
* Gets a core by name and increase its refcount.
*
* @see SolrCore#close()
* @param name the core name
* @return the core if found, null if a SolrCore by this name does not exist
* @exception SolrCoreInitializationException if a SolrCore with this name failed to be initialized
* @throws SolrCoreInitializationException if a SolrCore with this name failed to be initialized
* @see SolrCore#close()
*/
public SolrCore getCore(String name) {
@ -1742,15 +1743,14 @@ public class CoreContainer {
core = createFromDescriptor(desc, true, false); // This should throw an error if it fails.
}
core.open();
}
finally {
} finally {
solrCores.removeFromPendingOps(name);
}
return core;
}
public BlobRepository getBlobRepository(){
public BlobRepository getBlobRepository() {
return blobRepository;
}
@ -1771,12 +1771,12 @@ public class CoreContainer {
// ---------------- CoreContainer request handlers --------------
protected <T> T createHandler(String path, String handlerClass, Class<T> clazz) {
T handler = loader.newInstance(handlerClass, clazz, null, new Class[] { CoreContainer.class }, new Object[] { this });
T handler = loader.newInstance(handlerClass, clazz, null, new Class[]{CoreContainer.class}, new Object[]{this});
if (handler instanceof SolrRequestHandler) {
containerHandlers.put(path, (SolrRequestHandler)handler);
containerHandlers.put(path, (SolrRequestHandler) handler);
}
if (handler instanceof SolrMetricProducer) {
((SolrMetricProducer)handler).initializeMetrics(metricManager, SolrInfoBean.Group.node.toString(), metricTag, path);
((SolrMetricProducer) handler).initializeMetrics(metricManager, SolrInfoBean.Group.node.toString(), metricTag, path);
}
return handler;
}
@ -1789,7 +1789,9 @@ public class CoreContainer {
return collectionsHandler;
}
public HealthCheckHandler getHealthCheckHandler() { return healthCheckHandler; }
public HealthCheckHandler getHealthCheckHandler() {
return healthCheckHandler;
}
public InfoHandler getInfoHandler() {
return infoHandler;
@ -1820,7 +1822,6 @@ public class CoreContainer {
/**
* Determines whether the core is already loaded or not but does NOT load the core
*
*/
public boolean isLoaded(String name) {
return solrCores.isLoaded(name);
@ -1834,6 +1835,7 @@ public class CoreContainer {
public void queueCoreToClose(SolrCore coreToClose) {
solrCores.queueCoreToClose(coreToClose);
}
/**
* Gets a solr core descriptor for a core that is not loaded. Note that if the caller calls this on a
* loaded core, the unloaded descriptor will be returned.
@ -1861,7 +1863,9 @@ public class CoreContainer {
return cfg;
}
/** The default ShardHandlerFactory used to communicate with other solr instances */
/**
* The default ShardHandlerFactory used to communicate with other solr instances
*/
public ShardHandlerFactory getShardHandlerFactory() {
return shardHandlerFactory;
}
@ -1905,11 +1909,10 @@ public class CoreContainer {
/**
*
* @param cd CoreDescriptor, presumably a deficient one
* @param cd CoreDescriptor, presumably a deficient one
* @param prop The property that needs to be repaired.
* @return true if we were able to successfuly perisist the repaired coreDescriptor, false otherwise.
*
* <p>
* See SOLR-11503, This can be removed when there's no chance we'll need to upgrade a
* Solr installation created with legacyCloud=true from 6.6.1 through 7.1
*/
@ -1919,7 +1922,7 @@ public class CoreContainer {
if (CoreDescriptor.CORE_NODE_NAME.equals(prop) == false) {
throw new SolrException(ErrorCode.SERVER_ERROR,
String.format(Locale.ROOT,"The only supported property for repair is currently [%s]",
String.format(Locale.ROOT, "The only supported property for repair is currently [%s]",
CoreDescriptor.CORE_NODE_NAME));
}
@ -1930,7 +1933,7 @@ public class CoreContainer {
for (Replica rep : coll.getReplicas()) {
if (coreName.equals(rep.getCoreName())) {
log.warn("Core properties file for node {} found with no coreNodeName, attempting to repair with value {}. See SOLR-11503. " +
"This message should only appear if upgrading from collections created Solr 6.6.1 through 7.1.",
"This message should only appear if upgrading from collections created Solr 6.6.1 through 7.1.",
rep.getCoreName(), rep.getName());
cd.getCloudDescriptor().setCoreNodeName(rep.getName());
coresLocator.persist(this, cd);
@ -2012,7 +2015,7 @@ class CloserThread extends Thread {
// essentially create a single-threaded process anyway.
@Override
public void run() {
while (! container.isShutDown()) {
while (!container.isShutDown()) {
synchronized (solrCores.getModifyLock()) { // need this so we can wait and be awoken.
try {
solrCores.getModifyLock().wait();

File diff suppressed because it is too large Load Diff

View File

@ -168,7 +168,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final CoreContainer coreContainer;
private final CollectionHandlerApi v2Handler ;
private final CollectionHandlerApi v2Handler;
public CollectionsHandler() {
super();
@ -228,11 +228,11 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
CoreContainer cores = getCoreContainer();
if (cores == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Core container instance missing");
"Core container instance missing");
}
// Make sure that the core is ZKAware
if(!cores.isZooKeeperAware()) {
if (!cores.isZooKeeperAware()) {
throw new SolrException(ErrorCode.BAD_REQUEST,
"Solr instance is not running in SolrCloud mode.");
}
@ -306,7 +306,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
*/
private static final boolean CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS = true;
public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180*1000;
public static long DEFAULT_COLLECTION_OP_TIMEOUT = 180 * 1000;
public SolrResponse sendToOCPQueue(ZkNodeProps m) throws KeeperException, InterruptedException {
return sendToOCPQueue(m, DEFAULT_COLLECTION_OP_TIMEOUT);
@ -319,44 +319,44 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
if (m.get(ASYNC) != null) {
String asyncId = m.getStr(ASYNC);
String asyncId = m.getStr(ASYNC);
if (asyncId.equals("-1")) {
throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
}
if (asyncId.equals("-1")) {
throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
}
NamedList<String> r = new NamedList<>();
NamedList<String> r = new NamedList<>();
if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && (
coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
overseerCollectionQueueContains(asyncId))) {
// for back compatibility, check in the old places. This can be removed in Solr 9
r.add("error", "Task with the same requestid already exists.");
} else {
if (coreContainer.getZkController().claimAsyncId(asyncId)) {
boolean success = false;
try {
coreContainer.getZkController().getOverseerCollectionQueue()
.offer(Utils.toJSON(m));
success = true;
} finally {
if (!success) {
try {
coreContainer.getZkController().clearAsyncId(asyncId);
} catch (Exception e) {
// let the original exception bubble up
log.error("Unable to release async ID={}", asyncId, e);
SolrZkClient.checkInterrupted(e);
}
}
}
} else {
r.add("error", "Task with the same requestid already exists.");
}
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && (
coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
overseerCollectionQueueContains(asyncId))) {
// for back compatibility, check in the old places. This can be removed in Solr 9
r.add("error", "Task with the same requestid already exists.");
} else {
if (coreContainer.getZkController().claimAsyncId(asyncId)) {
boolean success = false;
try {
coreContainer.getZkController().getOverseerCollectionQueue()
.offer(Utils.toJSON(m));
success = true;
} finally {
if (!success) {
try {
coreContainer.getZkController().clearAsyncId(asyncId);
} catch (Exception e) {
// let the original exception bubble up
log.error("Unable to release async ID={}", asyncId, e);
SolrZkClient.checkInterrupted(e);
}
}
}
} else {
r.add("error", "Task with the same requestid already exists.");
}
}
r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
return new OverseerSolrResponse(r);
}
@ -393,12 +393,12 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
* Copy prefixed params into a map. There must only be one value for these parameters.
*
* @param params The source of params from which copies should be made
* @param props The map into which param names and values should be copied as keys and values respectively
* @param props The map into which param names and values should be copied as keys and values respectively
* @param prefix The prefix to select.
* @return the map supplied in the props parameter, modified to contain the prefixed params.
*/
private static Map<String, Object> copyPropertiesWithPrefix(SolrParams params, Map<String, Object> props, String prefix) {
Iterator<String> iter = params.getParameterNamesIterator();
Iterator<String> iter = params.getParameterNamesIterator();
while (iter.hasNext()) {
String param = iter.next();
if (param.startsWith(prefix)) {
@ -665,7 +665,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
if (createCollParams.get(COLL_CONF) == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"We require an explicit " + COLL_CONF );
"We require an explicit " + COLL_CONF);
}
// note: could insist on a config name here as well.... or wait to throw at overseer
createCollParams.add(NAME, "TMP_name_TMP_name_TMP"); // just to pass validation
@ -699,7 +699,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
// the aliases themselves...
rsp.getValues().add("aliases", aliases.getCollectionAliasMap());
// Any properties for the above aliases.
Map<String,Map<String,String>> meta = new LinkedHashMap<>();
Map<String, Map<String, String>> meta = new LinkedHashMap<>();
for (String alias : aliases.getCollectionAliasListMap().keySet()) {
Map<String, String> collectionAliasProperties = aliases.getCollectionAliasProperties(alias);
if (!collectionAliasProperties.isEmpty()) {
@ -792,8 +792,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
DELETE_DATA_DIR,
DELETE_INSTANCE_DIR,
DELETE_METRICS_HISTORY,
COUNT_PROP, REPLICA_PROP,
SHARD_ID_PROP,
COUNT_PROP, REPLICA_PROP,
SHARD_ID_PROP,
ONLY_IF_DOWN);
}),
MIGRATE_OP(MIGRATE, (req, rsp, h) -> {
@ -877,11 +877,11 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
if (flush) {
Collection<String> completed = zkController.getOverseerCompletedMap().keys();
Collection<String> failed = zkController.getOverseerFailureMap().keys();
for (String asyncId:completed) {
for (String asyncId : completed) {
zkController.getOverseerCompletedMap().remove(asyncId);
zkController.clearAsyncId(asyncId);
}
for (String asyncId:failed) {
for (String asyncId : failed) {
zkController.getOverseerFailureMap().remove(asyncId);
zkController.clearAsyncId(asyncId);
}
@ -1013,7 +1013,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
MODIFYCOLLECTION_OP(MODIFYCOLLECTION, (req, rsp, h) -> {
Map<String, Object> m = copy(req.getParams(), null, CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES);
copyPropertiesWithPrefix(req.getParams(), m, COLL_PROP_PREFIX);
if (m.isEmpty()) {
if (m.isEmpty()) {
throw new SolrException(ErrorCode.BAD_REQUEST,
formatString("no supported values provided {0}", CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES.toString()));
}
@ -1021,7 +1021,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
addMapObject(m, RULE);
addMapObject(m, SNITCH);
for (String prop : m.keySet()) {
if ("".equals(m.get(prop))) {
if ("".equals(m.get(prop))) {
// set to an empty string is equivalent to removing the property, see SOLR-12507
m.put(prop, null);
}
@ -1224,17 +1224,17 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
* all prefixed properties as the value. The sub-map keys have the prefix removed.
*
* @param params The solr params from which to extract prefixed properties.
* @param sink The map to add the properties too.
* @param sink The map to add the properties too.
* @param prefix The prefix to identify properties to be extracted
* @return The sink map, or a new map if the sink map was null
*/
private static Map<String, Object> convertPrefixToMap(SolrParams params, Map<String, Object> sink, String prefix) {
Map<String,Object> result = new LinkedHashMap<>();
Iterator<String> iter = params.getParameterNamesIterator();
Map<String, Object> result = new LinkedHashMap<>();
Iterator<String> iter = params.getParameterNamesIterator();
while (iter.hasNext()) {
String param = iter.next();
if (param.startsWith(prefix)) {
result.put(param.substring(prefix.length()+1), params.get(param));
result.put(param.substring(prefix.length() + 1), params.get(param));
}
}
if (sink == null) {
@ -1396,7 +1396,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
});
} catch (TimeoutException | InterruptedException e) {
String error = "Timeout waiting for active collection " + collectionName + " with timeout=" + seconds;
String error = "Timeout waiting for active collection " + collectionName + " with timeout=" + seconds;
throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
}
@ -1462,7 +1462,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
// These "copy" methods were once SolrParams.getAll but were moved here as there is no universal way that
// a SolrParams can be represented in a Map; there are various choices.
/**Copy all params to the given map or if the given map is null create a new one */
/**
* Copy all params to the given map or if the given map is null create a new one
*/
static Map<String, Object> copy(SolrParams source, Map<String, Object> sink, Collection<String> paramNames) {
if (sink == null) sink = new LinkedHashMap<>();
for (String param : paramNames) {
@ -1478,8 +1480,10 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
return sink;
}
/**Copy all params to the given map or if the given map is null create a new one */
static Map<String, Object> copy(SolrParams source, Map<String, Object> sink, String... paramNames){
/**
* Copy all params to the given map or if the given map is null create a new one
*/
static Map<String, Object> copy(SolrParams source, Map<String, Object> sink, String... paramNames) {
return copy(source, sink, paramNames == null ? Collections.emptyList() : Arrays.asList(paramNames));
}

View File

@ -87,7 +87,9 @@ public class ZkStateReader implements SolrCloseable {
public static final String STATE_PROP = "state";
// if this flag equals to false and the replica does not exist in cluster state, set state op become no op (default is true)
public static final String FORCE_SET_STATE_PROP = "force_set_state";
/** SolrCore name. */
/**
* SolrCore name.
*/
public static final String CORE_NAME_PROP = "core";
public static final String COLLECTION_PROP = "collection";
public static final String ELECTION_NODE_PROP = "election_node";
@ -132,7 +134,7 @@ public class ZkStateReader implements SolrCloseable {
public static final String ROLES = "/roles.json";
public static final String CONFIGS_ZKNODE = "/configs";
public final static String CONFIGNAME_PROP="configName";
public final static String CONFIGNAME_PROP = "configName";
public static final String LEGACY_CLOUD = "legacyCloud";
public static final String SAMPLE_PERCENTAGE = "samplePercentage";
@ -147,33 +149,48 @@ public class ZkStateReader implements SolrCloseable {
public static final String REPLICA_TYPE = "type";
/** A view of the current state of all collections; combines all the different state sources into a single view. */
/**
* A view of the current state of all collections; combines all the different state sources into a single view.
*/
protected volatile ClusterState clusterState;
private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "4000"));;
private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "4000"));
;
public static final String LEADER_ELECT_ZKNODE = "leader_elect";
public static final String SHARD_LEADERS_ZKNODE = "leaders";
public static final String ELECTION_NODE = "election";
/** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */
/**
* Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json.
*/
private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();
/** Last seen ZK version of clusterstate.json. */
/**
* Last seen ZK version of clusterstate.json.
*/
private int legacyClusterStateVersion = 0;
/** Collections with format2 state.json, "interesting" and actively watched. */
/**
* Collections with format2 state.json, "interesting" and actively watched.
*/
private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<>();
/** Collections with format2 state.json, not "interesting" and not actively watched. */
/**
* Collections with format2 state.json, not "interesting" and not actively watched.
*/
private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
/** Collection properties being actively watched */
/**
* Collection properties being actively watched
*/
private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
/** Collection properties being actively watched */
/**
* Collection properties being actively watched
*/
private final ConcurrentHashMap<String, PropsWatcher> collectionPropsWatchers = new ConcurrentHashMap<>();
private volatile SortedSet<String> liveNodes = emptySortedSet();
@ -199,7 +216,9 @@ public class ZkStateReader implements SolrCloseable {
private Set<ClusterPropertiesListener> clusterPropertiesListeners = ConcurrentHashMap.newKeySet();
/** Used to submit notifications to Collection Properties watchers in order **/
/**
* Used to submit notifications to Collection Properties watchers in order
**/
private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("collectionPropsNotifications"));
private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
@ -208,6 +227,7 @@ public class ZkStateReader implements SolrCloseable {
/**
* Get current {@link AutoScalingConfig}.
*
* @return current configuration from <code>autoscaling.json</code>. NOTE:
* this data is retrieved from ZK on each call.
*/
@ -217,6 +237,7 @@ public class ZkStateReader implements SolrCloseable {
/**
* Get current {@link AutoScalingConfig}.
*
* @param watcher optional {@link Watcher} to set on a znode to watch for config changes.
* @return current configuration from <code>autoscaling.json</code>. NOTE:
* this data is retrieved from ZK on each call.
@ -237,7 +258,7 @@ public class ZkStateReader implements SolrCloseable {
return new AutoScalingConfig(map);
}
private static class CollectionWatch <T> {
private static class CollectionWatch<T> {
int coreRefCount = 0;
Set<T> stateWatchers = ConcurrentHashMap.newKeySet();
@ -359,7 +380,7 @@ public class ZkStateReader implements SolrCloseable {
/**
* Forcibly refresh cluster state from ZK. Do this only to avoid race conditions because it's expensive.
*
* <p>
* It is cheaper to call {@link #forceUpdateCollection(String)} on a single collection if you must.
*
* @lucene.internal
@ -438,7 +459,9 @@ public class ZkStateReader implements SolrCloseable {
}
/** Refresh the set of live nodes. */
/**
* Refresh the set of live nodes.
*/
public void updateLiveNodes() throws KeeperException, InterruptedException {
refreshLiveNodes(null);
}
@ -449,7 +472,7 @@ public class ZkStateReader implements SolrCloseable {
if (collection.getZNodeVersion() < version) {
log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
DocCollection nu = getCollectionLive(this, coll);
if (nu == null) return -1 ;
if (nu == null) return -1;
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
if (updateWatchedCollection(coll, nu)) {
synchronized (getUpdateLock()) {
@ -478,7 +501,7 @@ public class ZkStateReader implements SolrCloseable {
// Sanity check ZK structure.
if (!zkClient.exists(CLUSTER_STATE, true)) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
}
// on reconnect of SolrZkClient force refresh and re-add watches.
@ -652,7 +675,7 @@ public class ZkStateReader implements SolrCloseable {
/**
* Search for any lazy-loadable state format2 collections.
*
* <p>
* A stateFormat=1 collection which is not interesting to us can also
* be put into the {@link #lazyCollectionStates} map here. But that is okay
* because {@link #constructState(Set)} will give priority to collections in the
@ -761,7 +784,8 @@ public class ZkStateReader implements SolrCloseable {
Stat exists = null;
try {
exists = zkClient.exists(getCollectionPath(collName), null, true);
} catch (Exception e) {}
} catch (Exception e) {
}
if (exists != null && exists.getVersion() == cachedDocCollection.getZNodeVersion()) {
shouldFetch = false;
}
@ -873,11 +897,13 @@ public class ZkStateReader implements SolrCloseable {
}
public void close() {
this.closed = true;
this.closed = true;
notifications.shutdownNow();
waitLatches.parallelStream().forEach(c -> { c.countDown(); });
waitLatches.parallelStream().forEach(c -> {
c.countDown();
});
ExecutorUtil.shutdownAndAwaitTermination(notifications);
ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
@ -904,6 +930,7 @@ public class ZkStateReader implements SolrCloseable {
}
return null;
}
public Replica getLeader(String collection, String shard) {
if (clusterState != null) {
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
@ -961,7 +988,7 @@ public class ZkStateReader implements SolrCloseable {
*/
public static String getShardLeadersElectPath(String collection, String shardId) {
return COLLECTIONS_ZKNODE + "/" + collection + "/"
+ LEADER_ELECT_ZKNODE + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE)
+ LEADER_ELECT_ZKNODE + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE)
: "");
}
@ -971,18 +998,18 @@ public class ZkStateReader implements SolrCloseable {
}
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
Replica.State mustMatchStateFilter) {
Replica.State mustMatchStateFilter) {
return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null);
}
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) {
Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter) {
//TODO: We don't need all these getReplicaProps method overloading. Also, it's odd that the default is to return replicas of type TLOG and NRT only
return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT));
return getReplicaProps(collection, shardId, thisCoreNodeName, mustMatchStateFilter, null, EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT));
}
public List<ZkCoreNodeProps> getReplicaProps(String collection, String shardId, String thisCoreNodeName,
Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter, final EnumSet<Replica.Type> acceptReplicaType) {
Replica.State mustMatchStateFilter, Replica.State mustNotMatchStateFilter, final EnumSet<Replica.Type> acceptReplicaType) {
assert thisCoreNodeName != null;
ClusterState clusterState = this.clusterState;
if (clusterState == null) {
@ -994,15 +1021,15 @@ public class ZkStateReader implements SolrCloseable {
"Could not find collection in zk: " + collection);
}
Map<String,Slice> slices = docCollection.getSlicesMap();
Map<String, Slice> slices = docCollection.getSlicesMap();
Slice replicas = slices.get(shardId);
if (replicas == null) {
throw new ZooKeeperException(ErrorCode.BAD_REQUEST, "Could not find shardId in zk: " + shardId);
}
Map<String,Replica> shardMap = replicas.getReplicasMap();
Map<String, Replica> shardMap = replicas.getReplicasMap();
List<ZkCoreNodeProps> nodes = new ArrayList<>(shardMap.size());
for (Entry<String,Replica> entry : shardMap.entrySet().stream().filter((e)->acceptReplicaType.contains(e.getValue().getType())).collect(Collectors.toList())) {
for (Entry<String, Replica> entry : shardMap.entrySet().stream().filter((e) -> acceptReplicaType.contains(e.getValue().getType())).collect(Collectors.toList())) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
String coreNodeName = entry.getValue().getName();
@ -1029,32 +1056,33 @@ public class ZkStateReader implements SolrCloseable {
/**
* Get a cluster property
*
* <p>
* N.B. Cluster properties are updated via ZK watchers, and so may not necessarily
* be completely up-to-date. If you need to get the latest version, then use a
* {@link ClusterProperties} instance.
*
* @param key the property to read
* @param defaultValue a default value to use if no such property exists
* @param <T> the type of the property
* @param key the property to read
* @param defaultValue a default value to use if no such property exists
* @param <T> the type of the property
* @return the cluster property, or a default if the property is not set
*/
@SuppressWarnings("unchecked")
public <T> T getClusterProperty(String key, T defaultValue) {
T value = (T) Utils.getObjectByPath( clusterProperties, false, key);
T value = (T) Utils.getObjectByPath(clusterProperties, false, key);
if (value == null)
return defaultValue;
return value;
}
/**Same as the above but allows a full json path as a list of parts
/**
* Same as the above but allows a full json path as a list of parts
*
* @param keyPath path to the property example ["collectionDefauls", "numShards"]
* @param keyPath path to the property example ["collectionDefauls", "numShards"]
* @param defaultValue a default value to use if no such property exists
* @return the cluster property, or a default if the property is not set
*/
public <T> T getClusterProperty(List<String> keyPath, T defaultValue) {
T value = (T) Utils.getObjectByPath( clusterProperties, false, keyPath);
T value = (T) Utils.getObjectByPath(clusterProperties, false, keyPath);
if (value == null)
return defaultValue;
return value;
@ -1062,7 +1090,7 @@ public class ZkStateReader implements SolrCloseable {
/**
* Get all cluster properties for this cluster
*
* <p>
* N.B. Cluster properties are updated via ZK watchers, and so may not necessarily
* be completely up-to-date. If you need to get the latest version, then use a
* {@link ClusterProperties} instance.
@ -1090,7 +1118,7 @@ public class ZkStateReader implements SolrCloseable {
this.clusterProperties = ClusterProperties.convertCollectionDefaultsToNestedFormat((Map<String, Object>) Utils.fromJSON(data));
log.debug("Loaded cluster properties: {}", this.clusterProperties);
for (ClusterPropertiesListener listener: clusterPropertiesListeners) {
for (ClusterPropertiesListener listener : clusterPropertiesListeners) {
listener.onChange(getClusterProperties());
}
return;
@ -1116,7 +1144,7 @@ public class ZkStateReader implements SolrCloseable {
* @return a map representing the key/value properties for the collection.
*/
public Map<String, String> getCollectionProperties(final String collection) {
return getCollectionProperties(collection,0);
return getCollectionProperties(collection, 0);
}
/**
@ -1126,7 +1154,7 @@ public class ZkStateReader implements SolrCloseable {
* This version of {@code getCollectionProperties} should be used when properties need to be consulted
* frequently in the absence of an active {@link CollectionPropsWatcher}.
*
* @param collection The collection for which properties are desired
* @param collection The collection for which properties are desired
* @param cacheForMillis The minimum number of milliseconds to maintain a cache for the specified collection's
* properties. Setting a {@code CollectionPropsWatcher} will override this value and retain
* the cache for the life of the watcher. A lack of changes in zookeeper may allow the
@ -1156,7 +1184,7 @@ public class ZkStateReader implements SolrCloseable {
properties = vcp.props;
if (cacheForMillis > 0) {
vcp.cacheUntilNs = untilNs;
watchedCollectionProps.put(collection,vcp);
watchedCollectionProps.put(collection, vcp);
} else {
// we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
// vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
@ -1174,7 +1202,7 @@ public class ZkStateReader implements SolrCloseable {
private class VersionedCollectionProps {
int zkVersion;
Map<String,String> props;
Map<String, String> props;
long cacheUntilNs = 0;
VersionedCollectionProps(int zkVersion, Map<String, String> props) {
@ -1202,7 +1230,7 @@ public class ZkStateReader implements SolrCloseable {
try {
Stat stat = new Stat();
byte[] data = zkClient.getData(znodePath, watcher, stat, true);
return new VersionedCollectionProps(stat.getVersion(),(Map<String, String>) Utils.fromJSON(data));
return new VersionedCollectionProps(stat.getVersion(), (Map<String, String>) Utils.fromJSON(data));
} catch (ClassCastException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
} catch (KeeperException.NoNodeException e) {
@ -1226,12 +1254,12 @@ public class ZkStateReader implements SolrCloseable {
*/
public ConfigData getSecurityProps(boolean getFresh) {
if (!getFresh) {
if (securityData == null) return new ConfigData(EMPTY_MAP,-1);
if (securityData == null) return new ConfigData(EMPTY_MAP, -1);
return new ConfigData(securityData.data, securityData.version);
}
try {
Stat stat = new Stat();
if(getZkClient().exists(SOLR_SECURITY_CONF_PATH, true)) {
if (getZkClient().exists(SOLR_SECURITY_CONF_PATH, true)) {
final byte[] data = getZkClient().getData(ZkStateReader.SOLR_SECURITY_CONF_PATH, null, stat, true);
return data != null && data.length > 0 ?
new ConfigData((Map<String, Object>) Utils.fromJSON(data), stat.getVersion()) :
@ -1239,9 +1267,9 @@ public class ZkStateReader implements SolrCloseable {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading security properties", e) ;
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading security properties", e);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR,"Error reading security properties", e) ;
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading security properties", e);
}
return null;
}
@ -1250,13 +1278,16 @@ public class ZkStateReader implements SolrCloseable {
* Returns the baseURL corresponding to a given node's nodeName --
* NOTE: does not (currently) imply that the nodeName (or resulting
* baseURL) exists in the cluster.
*
* @lucene.experimental
*/
public String getBaseUrlForNodeName(final String nodeName) {
return Utils.getBaseUrlForNodeName(nodeName, getClusterProperty(URL_SCHEME, "http"));
}
/** Watches a single collection's format2 state.json. */
/**
* Watches a single collection's format2 state.json.
*/
class StateWatcher implements Watcher {
private final String coll;
@ -1279,7 +1310,7 @@ public class ZkStateReader implements SolrCloseable {
Set<String> liveNodes = ZkStateReader.this.liveNodes;
log.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])",
event, coll, liveNodes.size());
event, coll, liveNodes.size());
refreshAndWatch();
@ -1310,7 +1341,9 @@ public class ZkStateReader implements SolrCloseable {
}
}
/** Watches the legacy clusterstate.json. */
/**
* Watches the legacy clusterstate.json.
*/
class LegacyClusterStateWatcher implements Watcher {
@Override
@ -1324,13 +1357,15 @@ public class ZkStateReader implements SolrCloseable {
refreshAndWatch();
}
/** Must hold {@link #getUpdateLock()} before calling this method. */
/**
* Must hold {@link #getUpdateLock()} before calling this method.
*/
public void refreshAndWatch() {
try {
refreshLegacyClusterState(this);
} catch (KeeperException.NoNodeException e) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
} catch (KeeperException e) {
@ -1344,7 +1379,9 @@ public class ZkStateReader implements SolrCloseable {
}
}
/** Watches collection properties */
/**
* Watches collection properties
*/
class PropsWatcher implements Watcher {
private final String coll;
private long watchUntilNs;
@ -1356,11 +1393,11 @@ public class ZkStateReader implements SolrCloseable {
PropsWatcher(String coll, long forMillis) {
this.coll = coll;
watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS);
watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis, TimeUnit.MILLISECONDS);
}
public PropsWatcher renew(long forMillis) {
watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS);
watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis, TimeUnit.MILLISECONDS);
return this;
}
@ -1428,7 +1465,9 @@ public class ZkStateReader implements SolrCloseable {
}
}
/** Watches /collections children . */
/**
* Watches /collections children .
*/
class CollectionsChildWatcher implements Watcher {
@Override
@ -1448,7 +1487,9 @@ public class ZkStateReader implements SolrCloseable {
}
}
/** Must hold {@link #getUpdateLock()} before calling this method. */
/**
* Must hold {@link #getUpdateLock()} before calling this method.
*/
public void refreshAndWatch() {
try {
refreshCollectionList(this);
@ -1465,7 +1506,9 @@ public class ZkStateReader implements SolrCloseable {
}
}
/** Watches the live_nodes and syncs changes. */
/**
* Watches the live_nodes and syncs changes.
*/
class LiveNodeWatcher implements Watcher {
@Override
@ -1531,7 +1574,7 @@ public class ZkStateReader implements SolrCloseable {
}
public static String getCollectionPathRoot(String coll) {
return COLLECTIONS_ZKNODE+"/"+coll;
return COLLECTIONS_ZKNODE + "/" + coll;
}
public static String getCollectionPath(String coll) {
@ -1541,14 +1584,13 @@ public class ZkStateReader implements SolrCloseable {
/**
* Notify this reader that a local Core is a member of a collection, and so that collection
* state should be watched.
*
* <p>
* Not a public API. This method should only be called from ZkController.
*
* <p>
* The number of cores per-collection is tracked, and adding multiple cores from the same
* collection does not increase the number of watches.
*
* @param collection the collection that the core is a member of
*
* @see ZkStateReader#unregisterCore(String)
*/
public void registerCore(String collection) {
@ -1568,9 +1610,9 @@ public class ZkStateReader implements SolrCloseable {
/**
* Notify this reader that a local core that is a member of a collection has been closed.
*
* <p>
* Not a public API. This method should only be called from ZkController.
*
* <p>
* If no cores are registered for a collection, and there are no {@link CollectionStateWatcher}s
* for that collection either, the collection watch will be removed.
*
@ -1619,7 +1661,7 @@ public class ZkStateReader implements SolrCloseable {
*/
public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
final DocCollectionAndLiveNodesWatcherWrapper wrapper
= new DocCollectionAndLiveNodesWatcherWrapper(collection, stateWatcher);
= new DocCollectionAndLiveNodesWatcherWrapper(collection, stateWatcher);
registerDocCollectionWatcher(collection, wrapper);
registerLiveNodesListener(wrapper);
@ -1673,14 +1715,14 @@ public class ZkStateReader implements SolrCloseable {
* instead
* </p>
*
* @see #waitForState(String, long, TimeUnit, Predicate)
* @see #registerCollectionStateWatcher
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
* @throws TimeoutException on timeout
* @see #waitForState(String, long, TimeUnit, Predicate)
* @see #registerCollectionStateWatcher
*/
public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
throws InterruptedException, TimeoutException {
@ -1707,8 +1749,7 @@ public class ZkStateReader implements SolrCloseable {
if (!latch.await(wait, unit))
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
}
finally {
} finally {
removeCollectionStateWatcher(collection, watcher);
waitLatches.remove(latch);
}
@ -1727,7 +1768,7 @@ public class ZkStateReader implements SolrCloseable {
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
* @throws TimeoutException on timeout
*/
public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
throws InterruptedException, TimeoutException {
@ -1754,8 +1795,7 @@ public class ZkStateReader implements SolrCloseable {
if (!latch.await(wait, unit))
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
}
finally {
} finally {
removeDocCollectionWatcher(collection, watcher);
waitLatches.remove(latch);
}
@ -1767,11 +1807,12 @@ public class ZkStateReader implements SolrCloseable {
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* </p>
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
*
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
* @throws TimeoutException on timeout
*/
public void waitForLiveNodes(long wait, TimeUnit unit, LiveNodesPredicate predicate)
throws InterruptedException, TimeoutException {
@ -1798,8 +1839,7 @@ public class ZkStateReader implements SolrCloseable {
if (!latch.await(wait, unit))
throw new TimeoutException("Timeout waiting for live nodes, currently they are: " + getClusterState().getLiveNodes());
}
finally {
} finally {
removeLiveNodesListener(listener);
waitLatches.remove(latch);
}
@ -1813,13 +1853,13 @@ public class ZkStateReader implements SolrCloseable {
* collection.
* </p>
*
* @see #registerCollectionStateWatcher
* @param collection the collection
* @param watcher the watcher
* @see #registerCollectionStateWatcher
*/
public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
final DocCollectionAndLiveNodesWatcherWrapper wrapper
= new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher);
= new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher);
removeDocCollectionWatcher(collection, wrapper);
removeLiveNodesListener(wrapper);
@ -1832,9 +1872,9 @@ public class ZkStateReader implements SolrCloseable {
* collection.
* </p>
*
* @see #registerDocCollectionWatcher
* @param collection the collection
* @param watcher the watcher
* @see #registerDocCollectionWatcher
*/
public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
AtomicBoolean reconstructState = new AtomicBoolean(false);
@ -1967,8 +2007,7 @@ public class ZkStateReader implements SolrCloseable {
}
try {
notifications.submit(new Notification(collection, collectionState));
}
catch (RejectedExecutionException e) {
} catch (RejectedExecutionException e) {
if (closed == false) {
log.error("Couldn't run collection notifications for {}", collection, e);
}
@ -2011,7 +2050,9 @@ public class ZkStateReader implements SolrCloseable {
// Aliases related
//
/** Access to the {@link Aliases}. */
/**
* Access to the {@link Aliases}.
*/
public final AliasesManager aliasesManager = new AliasesManager();
/**
@ -2039,7 +2080,7 @@ public class ZkStateReader implements SolrCloseable {
* per instance of ZkStateReader. Normally it will not be useful to create a new instance since
* this watcher automatically re-registers itself every time it is updated.
*/
public class AliasesManager implements Watcher { // the holder is a Zk watcher
public class AliasesManager implements Watcher { // the holder is a Zk watcher
// note: as of this writing, this class if very generic. Is it useful to use for other ZK managed things?
private final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -2247,9 +2288,9 @@ public class ZkStateReader implements SolrCloseable {
public boolean equals(Object other) {
if (other instanceof DocCollectionAndLiveNodesWatcherWrapper) {
DocCollectionAndLiveNodesWatcherWrapper that
= (DocCollectionAndLiveNodesWatcherWrapper) other;
= (DocCollectionAndLiveNodesWatcherWrapper) other;
return this.collectionName.equals(that.collectionName)
&& this.delegate.equals(that.delegate);
&& this.delegate.equals(that.delegate);
}
return false;
}
@ -2263,7 +2304,7 @@ public class ZkStateReader implements SolrCloseable {
@Override
public boolean onStateChanged(DocCollection collectionState) {
final boolean result = delegate.onStateChanged(ZkStateReader.this.liveNodes,
collectionState);
collectionState);
if (result) {
// it might be a while before live nodes changes, so proactively remove ourselves
removeLiveNodesListener(this);