SOLR-13439 - Adds ability to locally cache collection properties for a specified duration.

This commit is contained in:
Gus Heck 2019-06-11 14:36:04 -04:00
parent 613106b7e9
commit fbae72c4cc
2 changed files with 226 additions and 25 deletions

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.overseer;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.CollectionProperties;
import org.apache.solr.common.cloud.ZkStateReader;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@LuceneTestCase.Slow
@SolrTestCaseJ4.SuppressSSL
public class ZkCollectionPropsCachingTest extends SolrCloudTestCase {
//
// NOTE: This class can only have one test because our test for caching is to nuke the SolrZkClient to
// verify that a cached load is going to hit the cache, not try to talk to zk. Any other ZK related test
// method in this class will fail if it runs after testReadWriteCached, so don't add one! :)
//
private String collectionName;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void setupClass() throws Exception {
Boolean useLegacyCloud = rarely();
log.info("Using legacyCloud?: {}", useLegacyCloud);
configureCluster(4)
.withProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud))
.addConfig("conf", configset("cloud-minimal"))
.configure();
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
collectionName = "CollectionPropsTest" + System.nanoTime();
CollectionAdminRequest.Create request = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2);
CollectionAdminResponse response = request.process(cluster.getSolrClient());
assertTrue("Unable to create collection: " + response.toString(), response.isSuccess());
}
@Test
public void testReadWriteCached() throws InterruptedException, IOException {
ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
CollectionProperties collectionProps = new CollectionProperties(zkClient());
collectionProps.setCollectionProperty(collectionName, "property1", "value1");
checkValue("property1", "value1"); //Should be no cache, so the change should take effect immediately
zkStateReader.getCollectionProperties(collectionName,9000);
zkStateReader.getZkClient().close();
assertFalse(zkStateReader.isClosed());
checkValue("property1", "value1"); //Should be cached, so the change should not try to hit zk
Thread.sleep(10000); // test the timeout feature
try {
checkValue("property1", "value1"); //Should not be cached anymore
fail("cache should have expired, prev line should throw an exception trying to access zookeeper after closed");
} catch (Exception e) {
// expected, because we killed the client in zkStateReader.
}
}
private void checkValue(String propertyName, String expectedValue) throws InterruptedException {
final Object value = cluster.getSolrClient().getZkStateReader().getCollectionProperties(collectionName).get(propertyName);
assertEquals("Unexpected value for collection property: " + propertyName, expectedValue, value);
}
}

View File

@ -35,6 +35,7 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -171,6 +172,9 @@ public class ZkStateReader implements SolrCloseable {
/** Collection properties being actively watched */
private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
/** Collection properties being actively watched */
private final ConcurrentHashMap<String, PropsWatcher> collectionPropsWatchers = new ConcurrentHashMap<>();
private volatile SortedSet<String> liveNodes = emptySortedSet();
private volatile Map<String, Object> clusterProperties = Collections.emptyMap();
@ -183,7 +187,8 @@ public class ZkStateReader implements SolrCloseable {
private ConcurrentHashMap<String, CollectionWatch<CollectionStateWatcher>> collectionWatches = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsWatches = new ConcurrentHashMap<>();
// named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
@ -198,6 +203,8 @@ public class ZkStateReader implements SolrCloseable {
private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
private Future<?> collectionPropsCacheCleaner; // only kept to identify if the cleaner has already been started.
/**
* Get current {@link AutoScalingConfig}.
* @return current configuration from <code>autoscaling.json</code>. NOTE:
@ -484,8 +491,8 @@ public class ZkStateReader implements SolrCloseable {
securityData = getSecurityProps(true);
}
collectionPropsWatches.forEach((k,v) -> {
new PropsWatcher(k).refreshAndWatch(true);
collectionPropsObservers.forEach((k, v) -> {
collectionPropsWatchers.computeIfAbsent(k, PropsWatcher::new).refreshAndWatch(true);
});
}
@ -1094,17 +1101,60 @@ public class ZkStateReader implements SolrCloseable {
/**
* Get collection properties for a given collection. If the collection is watched, simply return it from the cache,
* otherwise fetch it directly from zookeeper.
* otherwise fetch it directly from zookeeper. This is a convenience for {@code getCollectionProperties(collection,0)}
*
* @param collection the collection for which properties are desired
* @return a map representing the key/value properties for the collection.
*/
public Map<String, String> getCollectionProperties(final String collection) {
return getCollectionProperties(collection,0);
}
/**
* Get and cache collection properties for a given collection. If the collection is watched, or still cached
* simply return it from the cache, otherwise fetch it directly from zookeeper and retain the value for at
* least cacheForMillis milliseconds. Cached properties are watched in zookeeper and updated automatically.
* This version of {@code getCollectionProperties} should be used when properties need to be consulted
* frequently in the absence of an active {@link CollectionPropsWatcher}.
*
* @param collection The collection for which properties are desired
* @param cacheForMillis The minimum number of milliseconds to maintain a cache for the specified collection's
* properties. Setting a {@code CollectionPropsWatcher} will override this value and retain
* the cache for the life of the watcher. A lack of changes in zookeeper may allow the
* caching to remain for a greater duration up to the cycle time of {@link CacheCleaner}.
* Passing zero for this value will explicitly remove the cached copy if and only if it is
* due to expire and no watch exists. Any positive value will extend the expiration time
* if required.
* @return a map representing the key/value properties for the collection.
*/
public Map<String, String> getCollectionProperties(final String collection, long cacheForMillis) {
synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
Watcher watcher = null;
if (cacheForMillis > 0) {
watcher = collectionPropsWatchers.compute(collection,
(c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis));
}
VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
Map<String, String> properties = vprops != null ? vprops.props : null;
if (properties == null) {
boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime();
long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS);
Map<String, String> properties;
if (haveUnexpiredProps) {
properties = vprops.props;
vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
} else {
try {
// todo: maybe we want to store/watch since if someone's calling this it's probably going to get called again?
// Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
properties = fetchCollectionProperties(collection, null ).props;
VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher);
properties = vcp.props;
if (cacheForMillis > 0) {
vcp.cacheUntilNs = untilNs;
watchedCollectionProps.put(collection,vcp);
} else {
// we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
// vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
if (!collectionPropsObservers.containsKey(collection)) {
watchedCollectionProps.remove(collection);
}
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
}
@ -1114,13 +1164,14 @@ public class ZkStateReader implements SolrCloseable {
}
private class VersionedCollectionProps {
public VersionedCollectionProps(int zkVersion, Map<String, String> props) {
int zkVersion;
Map<String,String> props;
long cacheUntilNs = 0;
VersionedCollectionProps(int zkVersion, Map<String, String> props) {
this.zkVersion = zkVersion;
this.props = props;
}
int zkVersion;
Map<String,String> props;
}
static String getCollectionPropsPath(final String collection) {
@ -1130,6 +1181,14 @@ public class ZkStateReader implements SolrCloseable {
@SuppressWarnings("unchecked")
private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
final String znodePath = getCollectionPropsPath(collection);
// lazy init cache cleaner once we know someone is using collection properties.
if (collectionPropsCacheCleaner == null) {
synchronized (this) { // There can be only one! :)
if (collectionPropsCacheCleaner == null) {
collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
}
}
}
while (true) {
try {
Stat stat = new Stat();
@ -1279,9 +1338,21 @@ public class ZkStateReader implements SolrCloseable {
/** Watches collection properties */
class PropsWatcher implements Watcher {
private final String coll;
private long watchUntilNs;
PropsWatcher(String coll) {
this.coll = coll;
watchUntilNs = 0;
}
PropsWatcher(String coll, long forMillis) {
this.coll = coll;
watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS);
}
public PropsWatcher renew(long forMillis) {
watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS);
return this;
}
@Override
@ -1291,7 +1362,8 @@ public class ZkStateReader implements SolrCloseable {
return;
}
if (!collectionPropsWatches.containsKey(coll)) {
boolean expired = System.nanoTime() > watchUntilNs;
if (!collectionPropsObservers.containsKey(coll) && expired) {
// No one can be notified of the change, we can ignore it and "unset" the watch
log.debug("Ignoring property change for collection {}", coll);
return;
@ -1320,6 +1392,19 @@ public class ZkStateReader implements SolrCloseable {
if (notifyWatchers) {
notifyPropsWatchers(coll, properties);
}
if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
// We should not be caching a collection that has been deleted.
watchedCollectionProps.remove(coll);
// core ref counting not relevant here, don't need canRemove(), we just sent
// a notification of an empty set of properties, no reason to watch what doesn't exist.
collectionPropsObservers.remove(coll);
// This is the one time we know it's safe to throw this out. We just failed to set the watch
// due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
collectionPropsWatchers.remove(coll);
}
}
}
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
@ -1708,7 +1793,7 @@ public class ZkStateReader implements SolrCloseable {
public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
collectionPropsWatches.compute(collection, (k, v) -> {
collectionPropsObservers.compute(collection, (k, v) -> {
if (v == null) {
v = new CollectionWatch<>();
watchSet.set(true);
@ -1718,12 +1803,12 @@ public class ZkStateReader implements SolrCloseable {
});
if (watchSet.get()) {
new PropsWatcher(collection).refreshAndWatch(false);
collectionPropsWatchers.computeIfAbsent(collection, PropsWatcher::new).refreshAndWatch(false);
}
}
public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
collectionPropsWatches.compute(collection, (k, v) -> {
collectionPropsObservers.compute(collection, (k, v) -> {
if (v == null)
return null;
v.stateWatchers.remove(watcher);
@ -1983,30 +2068,46 @@ public class ZkStateReader implements SolrCloseable {
private class PropsNotification implements Runnable {
final String collection;
final Map<String, String> collectionProperties;
private final String collection;
private final Map<String, String> collectionProperties;
private final List<CollectionPropsWatcher> watchers = new ArrayList<>();
private PropsNotification(String collection, Map<String, String> collectionProperties) {
this.collection = collection;
this.collectionProperties = collectionProperties;
}
@Override
public void run() {
List<CollectionPropsWatcher> watchers = new ArrayList<>();
collectionPropsWatches.compute(collection, (k, v) -> {
// guarantee delivery of notification regardless of what happens to collectionPropsObservers
// while we wait our turn in the executor by capturing the list on creation.
collectionPropsObservers.compute(collection, (k, v) -> {
if (v == null)
return null;
watchers.addAll(v.stateWatchers);
return v;
});
}
@Override
public void run() {
for (CollectionPropsWatcher watcher : watchers) {
if (watcher.onStateChanged(collectionProperties)) {
removeCollectionPropsWatcher(collection, watcher);
}
}
}
}
private class CacheCleaner implements Runnable {
public void run() {
while (!Thread.interrupted()) {
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
// Executor shutdown will send us an interrupt
break;
}
watchedCollectionProps.entrySet().removeIf(entry ->
entry.getValue().cacheUntilNs < System.nanoTime() && !collectionPropsObservers.containsKey(entry.getKey()));
}
}
}
}