From fbae72c4cc86c83ee1b6fd5fd4a6b1f6cdb391ae Mon Sep 17 00:00:00 2001 From: Gus Heck Date: Tue, 11 Jun 2019 14:36:04 -0400 Subject: [PATCH] SOLR-13439 - Adds ability to locally cache collection properties for a specified duration. --- .../ZkCollectionPropsCachingTest.java | 100 ++++++++++++ .../solr/common/cloud/ZkStateReader.java | 151 +++++++++++++++--- 2 files changed, 226 insertions(+), 25 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java new file mode 100644 index 00000000000..a765ada9594 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud.overseer; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; + +import org.apache.lucene.util.LuceneTestCase; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.cloud.CollectionProperties; +import org.apache.solr.common.cloud.ZkStateReader; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@LuceneTestCase.Slow +@SolrTestCaseJ4.SuppressSSL +public class ZkCollectionPropsCachingTest extends SolrCloudTestCase { + // + // NOTE: This class can only have one test because our test for caching is to nuke the SolrZkClient to + // verify that a cached load is going to hit the cache, not try to talk to zk. Any other ZK related test + // method in this class will fail if it runs after testReadWriteCached, so don't add one! :) + // + private String collectionName; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @BeforeClass + public static void setupClass() throws Exception { + Boolean useLegacyCloud = rarely(); + log.info("Using legacyCloud?: {}", useLegacyCloud); + + configureCluster(4) + .withProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud)) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + collectionName = "CollectionPropsTest" + System.nanoTime(); + + CollectionAdminRequest.Create request = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2); + CollectionAdminResponse response = request.process(cluster.getSolrClient()); + assertTrue("Unable to create collection: " + response.toString(), response.isSuccess()); + } + + @Test + public void testReadWriteCached() throws InterruptedException, IOException { + ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + + CollectionProperties collectionProps = new CollectionProperties(zkClient()); + + collectionProps.setCollectionProperty(collectionName, "property1", "value1"); + checkValue("property1", "value1"); //Should be no cache, so the change should take effect immediately + + zkStateReader.getCollectionProperties(collectionName,9000); + zkStateReader.getZkClient().close(); + assertFalse(zkStateReader.isClosed()); + checkValue("property1", "value1"); //Should be cached, so the change should not try to hit zk + + Thread.sleep(10000); // test the timeout feature + try { + checkValue("property1", "value1"); //Should not be cached anymore + fail("cache should have expired, prev line should throw an exception trying to access zookeeper after closed"); + } catch (Exception e) { + // expected, because we killed the client in zkStateReader. + } + } + + private void checkValue(String propertyName, String expectedValue) throws InterruptedException { + final Object value = cluster.getSolrClient().getZkStateReader().getCollectionProperties(collectionName).get(propertyName); + assertEquals("Unexpected value for collection property: " + propertyName, expectedValue, value); + } + + + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index f00cdffa05a..e09f89ec7f3 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -35,6 +35,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -171,6 +172,9 @@ public class ZkStateReader implements SolrCloseable { /** Collection properties being actively watched */ private final ConcurrentHashMap watchedCollectionProps = new ConcurrentHashMap<>(); + /** Collection properties being actively watched */ + private final ConcurrentHashMap collectionPropsWatchers = new ConcurrentHashMap<>(); + private volatile SortedSet liveNodes = emptySortedSet(); private volatile Map clusterProperties = Collections.emptyMap(); @@ -183,7 +187,8 @@ public class ZkStateReader implements SolrCloseable { private ConcurrentHashMap> collectionWatches = new ConcurrentHashMap<>(); - private ConcurrentHashMap> collectionPropsWatches = new ConcurrentHashMap<>(); + // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map. + private ConcurrentHashMap> collectionPropsObservers = new ConcurrentHashMap<>(); private Set cloudCollectionsListeners = ConcurrentHashMap.newKeySet(); @@ -198,6 +203,8 @@ public class ZkStateReader implements SolrCloseable { private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS); + private Future collectionPropsCacheCleaner; // only kept to identify if the cleaner has already been started. + /** * Get current {@link AutoScalingConfig}. * @return current configuration from autoscaling.json. NOTE: @@ -484,8 +491,8 @@ public class ZkStateReader implements SolrCloseable { securityData = getSecurityProps(true); } - collectionPropsWatches.forEach((k,v) -> { - new PropsWatcher(k).refreshAndWatch(true); + collectionPropsObservers.forEach((k, v) -> { + collectionPropsWatchers.computeIfAbsent(k, PropsWatcher::new).refreshAndWatch(true); }); } @@ -1094,17 +1101,60 @@ public class ZkStateReader implements SolrCloseable { /** * Get collection properties for a given collection. If the collection is watched, simply return it from the cache, - * otherwise fetch it directly from zookeeper. + * otherwise fetch it directly from zookeeper. This is a convenience for {@code getCollectionProperties(collection,0)} + * + * @param collection the collection for which properties are desired + * @return a map representing the key/value properties for the collection. */ public Map getCollectionProperties(final String collection) { + return getCollectionProperties(collection,0); + } + + /** + * Get and cache collection properties for a given collection. If the collection is watched, or still cached + * simply return it from the cache, otherwise fetch it directly from zookeeper and retain the value for at + * least cacheForMillis milliseconds. Cached properties are watched in zookeeper and updated automatically. + * This version of {@code getCollectionProperties} should be used when properties need to be consulted + * frequently in the absence of an active {@link CollectionPropsWatcher}. + * + * @param collection The collection for which properties are desired + * @param cacheForMillis The minimum number of milliseconds to maintain a cache for the specified collection's + * properties. Setting a {@code CollectionPropsWatcher} will override this value and retain + * the cache for the life of the watcher. A lack of changes in zookeeper may allow the + * caching to remain for a greater duration up to the cycle time of {@link CacheCleaner}. + * Passing zero for this value will explicitly remove the cached copy if and only if it is + * due to expire and no watch exists. Any positive value will extend the expiration time + * if required. + * @return a map representing the key/value properties for the collection. + */ + public Map getCollectionProperties(final String collection, long cacheForMillis) { synchronized (watchedCollectionProps) { // making decisions based on the result of a get... + Watcher watcher = null; + if (cacheForMillis > 0) { + watcher = collectionPropsWatchers.compute(collection, + (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis)); + } VersionedCollectionProps vprops = watchedCollectionProps.get(collection); - Map properties = vprops != null ? vprops.props : null; - if (properties == null) { + boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime(); + long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS); + Map properties; + if (haveUnexpiredProps) { + properties = vprops.props; + vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs); + } else { try { - // todo: maybe we want to store/watch since if someone's calling this it's probably going to get called again? - // Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set. - properties = fetchCollectionProperties(collection, null ).props; + VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher); + properties = vcp.props; + if (cacheForMillis > 0) { + vcp.cacheUntilNs = untilNs; + watchedCollectionProps.put(collection,vcp); + } else { + // we're synchronized on watchedCollectionProps and we can only get here if we have found an expired + // vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner. + if (!collectionPropsObservers.containsKey(collection)) { + watchedCollectionProps.remove(collection); + } + } } catch (Exception e) { throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e)); } @@ -1114,13 +1164,14 @@ public class ZkStateReader implements SolrCloseable { } private class VersionedCollectionProps { - public VersionedCollectionProps(int zkVersion, Map props) { + int zkVersion; + Map props; + long cacheUntilNs = 0; + + VersionedCollectionProps(int zkVersion, Map props) { this.zkVersion = zkVersion; this.props = props; } - - int zkVersion; - Map props; } static String getCollectionPropsPath(final String collection) { @@ -1130,6 +1181,14 @@ public class ZkStateReader implements SolrCloseable { @SuppressWarnings("unchecked") private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException { final String znodePath = getCollectionPropsPath(collection); + // lazy init cache cleaner once we know someone is using collection properties. + if (collectionPropsCacheCleaner == null) { + synchronized (this) { // There can be only one! :) + if (collectionPropsCacheCleaner == null) { + collectionPropsCacheCleaner = notifications.submit(new CacheCleaner()); + } + } + } while (true) { try { Stat stat = new Stat(); @@ -1279,9 +1338,21 @@ public class ZkStateReader implements SolrCloseable { /** Watches collection properties */ class PropsWatcher implements Watcher { private final String coll; + private long watchUntilNs; PropsWatcher(String coll) { this.coll = coll; + watchUntilNs = 0; + } + + PropsWatcher(String coll, long forMillis) { + this.coll = coll; + watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS); + } + + public PropsWatcher renew(long forMillis) { + watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS); + return this; } @Override @@ -1291,7 +1362,8 @@ public class ZkStateReader implements SolrCloseable { return; } - if (!collectionPropsWatches.containsKey(coll)) { + boolean expired = System.nanoTime() > watchUntilNs; + if (!collectionPropsObservers.containsKey(coll) && expired) { // No one can be notified of the change, we can ignore it and "unset" the watch log.debug("Ignoring property change for collection {}", coll); return; @@ -1320,6 +1392,19 @@ public class ZkStateReader implements SolrCloseable { if (notifyWatchers) { notifyPropsWatchers(coll, properties); } + if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected + + // We should not be caching a collection that has been deleted. + watchedCollectionProps.remove(coll); + + // core ref counting not relevant here, don't need canRemove(), we just sent + // a notification of an empty set of properties, no reason to watch what doesn't exist. + collectionPropsObservers.remove(coll); + + // This is the one time we know it's safe to throw this out. We just failed to set the watch + // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update. + collectionPropsWatchers.remove(coll); + } } } } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { @@ -1708,7 +1793,7 @@ public class ZkStateReader implements SolrCloseable { public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) { AtomicBoolean watchSet = new AtomicBoolean(false); - collectionPropsWatches.compute(collection, (k, v) -> { + collectionPropsObservers.compute(collection, (k, v) -> { if (v == null) { v = new CollectionWatch<>(); watchSet.set(true); @@ -1718,12 +1803,12 @@ public class ZkStateReader implements SolrCloseable { }); if (watchSet.get()) { - new PropsWatcher(collection).refreshAndWatch(false); + collectionPropsWatchers.computeIfAbsent(collection, PropsWatcher::new).refreshAndWatch(false); } } public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) { - collectionPropsWatches.compute(collection, (k, v) -> { + collectionPropsObservers.compute(collection, (k, v) -> { if (v == null) return null; v.stateWatchers.remove(watcher); @@ -1983,30 +2068,46 @@ public class ZkStateReader implements SolrCloseable { private class PropsNotification implements Runnable { - final String collection; - final Map collectionProperties; + private final String collection; + private final Map collectionProperties; + private final List watchers = new ArrayList<>(); private PropsNotification(String collection, Map collectionProperties) { this.collection = collection; this.collectionProperties = collectionProperties; - } - - @Override - public void run() { - List watchers = new ArrayList<>(); - collectionPropsWatches.compute(collection, (k, v) -> { + // guarantee delivery of notification regardless of what happens to collectionPropsObservers + // while we wait our turn in the executor by capturing the list on creation. + collectionPropsObservers.compute(collection, (k, v) -> { if (v == null) return null; watchers.addAll(v.stateWatchers); return v; }); + } + + @Override + public void run() { for (CollectionPropsWatcher watcher : watchers) { if (watcher.onStateChanged(collectionProperties)) { removeCollectionPropsWatcher(collection, watcher); } } } + } + private class CacheCleaner implements Runnable { + public void run() { + while (!Thread.interrupted()) { + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + // Executor shutdown will send us an interrupt + break; + } + watchedCollectionProps.entrySet().removeIf(entry -> + entry.getValue().cacheUntilNs < System.nanoTime() && !collectionPropsObservers.containsKey(entry.getKey())); + } + } } }