From 5a974860fa83408a86ca64b417f3111b037da7eb Mon Sep 17 00:00:00 2001 From: Chris Hostetter Date: Mon, 17 Jun 2019 09:59:43 -0700 Subject: [PATCH] SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and CloudSolrClient to be triggered on liveNode changes. Also add Predicate equivilents for callers that don't care about liveNodes. --- solr/CHANGES.txt | 4 + .../org/apache/solr/cloud/ZkController.java | 14 +- .../api/collections/CreateCollectionCmd.java | 2 +- .../api/collections/DeleteCollectionCmd.java | 2 +- .../cloud/api/collections/DeleteShardCmd.java | 2 +- .../OverseerCollectionMessageHandler.java | 2 +- .../apache/solr/cloud/DeleteReplicaTest.java | 6 +- .../TestWaitForStateWithJettyShutdowns.java | 155 ++++++++++ .../common/cloud/ZkStateReaderAccessor.java | 2 +- .../solrj/impl/BaseCloudSolrClient.java | 68 +++- .../cloud/CollectionStatePredicate.java | 10 +- .../common/cloud/CollectionStateWatcher.java | 9 +- .../common/cloud/DocCollectionWatcher.java | 40 +++ .../solr/common/cloud/ZkStateReader.java | 202 ++++++++++-- .../cloud/TestCollectionStateWatchers.java | 180 +++++++---- .../cloud/TestDocCollectionWatcher.java | 291 ++++++++++++++++++ .../solr/cloud/AbstractDistribZkTestBase.java | 6 +- .../cloud/AbstractFullDistribZkTestBase.java | 4 +- .../solr/cloud/MiniSolrCloudCluster.java | 2 +- 19 files changed, 889 insertions(+), 112 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java create mode 100644 solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java create mode 100644 solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 2f7ab1eb51a..061d2b30d0c 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -159,6 +159,10 @@ Bug Fixes * SOLR-13333: unleashing terms.ttf from terms.list when distrib=false (Munendra S N via Mikhail Khludnev) +* SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and + CloudSolrClient to be triggered on liveNode changes. Also add Predicate equivilents + for callers that don't care about liveNodes. (hossman) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 8e90dd8b3e3..af298c5dde1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -68,12 +68,12 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.BeforeReconnect; import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.ConnectionManager; import org.apache.solr.common.cloud.DefaultConnectionStrategy; import org.apache.solr.common.cloud.DefaultZkACLProvider; import org.apache.solr.common.cloud.DefaultZkCredentialsProvider; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocCollectionWatcher; import org.apache.solr.common.cloud.LiveNodesListener; import org.apache.solr.common.cloud.OnReconnect; import org.apache.solr.common.cloud.Replica; @@ -1052,7 +1052,7 @@ public class ZkController implements Closeable { CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size()); for (String collectionWithLocalReplica : collectionsWithLocalReplica) { - zkStateReader.registerCollectionStateWatcher(collectionWithLocalReplica, (liveNodes, collectionState) -> { + zkStateReader.registerDocCollectionWatcher(collectionWithLocalReplica, (collectionState) -> { if (collectionState == null) return false; boolean foundStates = true; for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) { @@ -1194,7 +1194,7 @@ public class ZkController implements Closeable { // check replica's existence in clusterstate first try { zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100, - TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null); + TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null); } catch (TimeoutException e) { throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate"); } @@ -1301,7 +1301,7 @@ public class ZkController implements Closeable { // make sure we have an update cluster state right away zkStateReader.forceUpdateCollection(collection); // the watcher is added to a set so multiple calls of this method will left only one watcher - zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(), + zkStateReader.registerDocCollectionWatcher(cloudDesc.getCollectionName(), new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName())); return shardId; } finally { @@ -1845,7 +1845,7 @@ public class ZkController implements Closeable { AtomicReference errorMessage = new AtomicReference<>(); AtomicReference collectionState = new AtomicReference<>(); try { - zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (n, c) -> { + zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> { collectionState.set(c); if (c == null) return false; @@ -2545,7 +2545,7 @@ public class ZkController implements Closeable { }; } - private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher { + private class UnloadCoreOnDeletedWatcher implements DocCollectionWatcher { String coreNodeName; String shard; String coreName; @@ -2558,7 +2558,7 @@ public class ZkController implements Closeable { @Override // synchronized due to SOLR-11535 - public synchronized boolean onStateChanged(Set liveNodes, DocCollection collectionState) { + public synchronized boolean onStateChanged(DocCollection collectionState) { if (getCoreContainer().getCoreDescriptor(coreName) == null) return true; boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null; diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java index dfd5a218750..372ae533090 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java @@ -321,7 +321,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd CollectionAdminParams.COLOCATED_WITH, collectionName); ocmh.overseer.offerStateUpdate(Utils.toJSON(props)); try { - zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH))); + zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH))); } catch (TimeoutException e) { log.warn("Timed out waiting to see the " + COLOCATED_WITH + " property set on collection: " + withCollection); // maybe the overseer queue is backed up, we don't want to fail the create request diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java index 2054258bbeb..6c0b147b61d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java @@ -134,7 +134,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd ocmh.overseer.offerStateUpdate(Utils.toJSON(m)); // wait for a while until we don't see the collection - zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null); + zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null); // we can delete any remaining unique aliases if (!aliasReferences.isEmpty()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java index e38aa4aee31..c5c8e99c099 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java @@ -134,7 +134,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd { ZkStateReader zkStateReader = ocmh.zkStateReader; ocmh.overseer.offerStateUpdate(Utils.toJSON(m)); - zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (l, c) -> c.getSlice(sliceId) == null); + zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null); log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId); } catch (SolrException e) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java index 81407c66156..d8be1f2ccf9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java @@ -416,7 +416,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException { try { - zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (n, c) -> { + zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> { if (c == null) return true; Slice slice = c.getSlice(shard); diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java index 62a891a826e..1050b4cd4d3 100644 --- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java @@ -35,8 +35,8 @@ import org.apache.solr.client.solrj.request.CoreStatus; import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocCollectionWatcher; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkNodeProps; @@ -115,7 +115,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase { JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica); ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader()); - Set watchers = accessor.getStateWatchers(collectionName); + Set watchers = accessor.getStateWatchers(collectionName); CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName()) .process(cluster.getSolrClient()); waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> { @@ -221,7 +221,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase { Replica replica = getRandomReplica(shard); JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica); ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader()); - Set watchers = accessor.getStateWatchers(collectionName); + Set watchers = accessor.getStateWatchers(collectionName); ZkNodeProps m = new ZkNodeProps( Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), diff --git a/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java new file mode 100644 index 00000000000..4e21fb37faa --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestWaitForStateWithJettyShutdowns.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; + +import java.util.Set; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.embedded.JettyConfig; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.common.cloud.CollectionStatePredicate; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.util.DefaultSolrThreadFactory; + +import static org.apache.solr.cloud.SolrCloudTestCase.clusterShape; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public void testWaitForStateAfterShutDown() throws Exception { + final String col_name = "test_col"; + final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster + (1, createTempDir(), JettyConfig.builder().build()); + try { + log.info("Create our collection"); + CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient()); + + log.info("Sanity check that our collection has come online"); + cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS, clusterShape(1, 1)); + + log.info("Shutdown 1 node"); + final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0); + nodeToStop.stop(); + log.info("Wait to confirm our node is fully shutdown"); + cluster.waitForJettyToStop(nodeToStop); + + // now that we're confident that node has stoped, check if a waitForState + // call will detect the missing replica -- shouldn't need long wait times (we know it's down)... + log.info("Now check if waitForState will recognize we already have the exepcted state"); + cluster.getSolrClient().waitForState(col_name, 500, TimeUnit.MILLISECONDS, clusterShape(1, 0)); + + + } finally { + cluster.shutdown(); + } + } + + public void testWaitForStateBeforeShutDown() throws Exception { + final String col_name = "test_col"; + final ExecutorService executor = ExecutorUtil.newMDCAwareFixedThreadPool + (1, new DefaultSolrThreadFactory("background_executor")); + final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster + (1, createTempDir(), JettyConfig.builder().build()); + try { + log.info("Create our collection"); + CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient()); + + log.info("Sanity check that our collection has come online"); + cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS, + SolrCloudTestCase.clusterShape(1, 1)); + + + // HACK implementation detail... + // + // we know that in the current implementation, waitForState invokes the predicate twice + // independently of the current state of the collection and/or wether the predicate succeeds. + // If this implementation detail changes, (ie: so that it's only invoked once) + // then this number needs to change -- but the test fundementally depends on the implementation + // calling the predicate at least once, which should also be neccessary for any future impl + // (to verify that it didn't "miss" the state change when creating the watcher) + final CountDownLatch latch = new CountDownLatch(2); + + final Future backgroundWaitForState = executor.submit + (() -> { + try { + cluster.getSolrClient().waitForState(col_name, 180, TimeUnit.SECONDS, + new LatchCountingPredicateWrapper(latch, + clusterShape(1, 0))); + } catch (Exception e) { + log.error("background thread got exception", e); + throw new RuntimeException(e); + } + return; + }, null); + + log.info("Awaiting latch..."); + if (! latch.await(120, TimeUnit.SECONDS)) { + fail("timed out Waiting a ridiculous amount of time for the waitForState latch -- did impl change?"); + } + + log.info("Shutdown 1 node"); + final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0); + nodeToStop.stop(); + log.info("Wait to confirm our node is fully shutdown"); + cluster.waitForJettyToStop(nodeToStop); + + // now that we're confident that node has stoped, check if a waitForState + // call will detect the missing replica -- shouldn't need long wait times... + log.info("Checking Future result to see if waitForState finished successfully"); + try { + backgroundWaitForState.get(); + } catch (ExecutionException e) { + log.error("background waitForState exception", e); + throw e; + } + + } finally { + ExecutorUtil.shutdownAndAwaitTermination(executor); + cluster.shutdown(); + } + } + + public final class LatchCountingPredicateWrapper implements CollectionStatePredicate { + private final CountDownLatch latch; + private final CollectionStatePredicate inner; + public LatchCountingPredicateWrapper(final CountDownLatch latch, final CollectionStatePredicate inner) { + this.latch = latch; + this.inner = inner; + } + public boolean matches(Set liveNodes, DocCollection collectionState) { + final boolean result = inner.matches(liveNodes, collectionState); + log.info("Predicate called: result={}, (pre)latch={}, liveNodes={}, state={}", + result, latch.getCount(), liveNodes, collectionState); + latch.countDown(); + return result; + } + } +} diff --git a/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java b/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java index 0853ee062ce..b40a7a25c25 100644 --- a/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java +++ b/solr/core/src/test/org/apache/solr/common/cloud/ZkStateReaderAccessor.java @@ -28,7 +28,7 @@ public class ZkStateReaderAccessor { this.zkStateReader = zkStateReader; } - public Set getStateWatchers(String collection) { + public Set getStateWatchers(String collection) { return zkStateReader.getStateWatchers(collection); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java index 02fbb703fcb..82645fd5beb 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.solr.client.solrj.ResponseParser; @@ -62,6 +63,7 @@ import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.CollectionStatePredicate; import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.DocCollectionWatcher; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.Replica; @@ -362,11 +364,21 @@ public abstract class BaseCloudSolrClient extends SolrClient { } /** - * Block until a collection state matches a predicate, or a timeout + * Block until a CollectionStatePredicate returns true, or the wait times out * + *

* Note that the predicate may be called again even after it has returned true, so * implementors should avoid changing state within the predicate call itself. + *

* + *

+ * This implementation utilizes {@link CollectionStateWatcher} internally. + * Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate} + * instead + *

+ * + * @see #waitForState(String, long, TimeUnit, Predicate) + * @see #registerCollectionStateWatcher * @param collection the collection to watch * @param wait how long to wait * @param unit the units of the wait parameter @@ -379,14 +391,45 @@ public abstract class BaseCloudSolrClient extends SolrClient { getClusterStateProvider().connect(); assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate); } + /** + * Block until a Predicate returns true, or the wait times out + * + *

+ * Note that the predicate may be called again even after it has returned true, so + * implementors should avoid changing state within the predicate call itself. + *

+ * + * @see #registerDocCollectionWatcher + * @param collection the collection to watch + * @param wait how long to wait + * @param unit the units of the wait parameter + * @param predicate a {@link Predicate} to test against the {@link DocCollection} + * @throws InterruptedException on interrupt + * @throws TimeoutException on timeout + */ + public void waitForState(String collection, long wait, TimeUnit unit, Predicate predicate) + throws InterruptedException, TimeoutException { + getClusterStateProvider().connect(); + assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate); + } /** * Register a CollectionStateWatcher to be called when the cluster state for a collection changes + * or the set of live nodes changes. * - * Note that the watcher is unregistered after it has been called once. To make a watcher persistent, - * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} - * call + *

+ * The Watcher will automatically be removed when it's + * onStateChanged returns true + *

* + *

+ * This implementation utilizes {@link ZkStateReader#registerCollectionStateWatcher} internally. + * Callers that don't care about liveNodes are encouraged to use a {@link DocCollectionWatcher} + * instead + *

+ * + * @see #registerDocCollectionWatcher(String, DocCollectionWatcher) + * @see ZkStateReader#registerCollectionStateWatcher * @param collection the collection to watch * @param watcher a watcher that will be called when the state changes */ @@ -394,6 +437,23 @@ public abstract class BaseCloudSolrClient extends SolrClient { getClusterStateProvider().connect(); assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher); } + + /** + * Register a DocCollectionWatcher to be called when the cluster state for a collection changes. + * + *

+ * The Watcher will automatically be removed when it's + * onStateChanged returns true + *

+ * + * @see ZkStateReader#registerDocCollectionWatcher + * @param collection the collection to watch + * @param watcher a watcher that will be called when the state changes + */ + public void registerDocCollectionWatcher(String collection, DocCollectionWatcher watcher) { + getClusterStateProvider().connect(); + assertZKStateProvider().zkStateReader.registerDocCollectionWatcher(collection, watcher); + } private NamedList directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException { UpdateRequest updateRequest = (UpdateRequest) request; diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java index 37b00d7c28e..a91a499b12c 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java @@ -19,23 +19,27 @@ package org.apache.solr.common.cloud; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; /** - * Interface to determine if a collection state matches a required state + * Interface to determine if a set of liveNodes and a collection's state matches some expecatations. * * @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate) + * @see ZkStateReader#waitForState(String, long, TimeUnit, Predicate) */ public interface CollectionStatePredicate { /** - * Check the collection state matches a required state - * + * Check if the set of liveNodes and the collection state matches a required state + *

* Note that both liveNodes and collectionState should be consulted to determine * the overall state. + *

* * @param liveNodes the current set of live nodes * @param collectionState the latest collection state, or null if the collection * does not exist + * @return true if the input matches the requirements of this predicate */ boolean matches(Set liveNodes, DocCollection collectionState); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java index d75823c0c89..63bfaf9b3d2 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java @@ -21,21 +21,24 @@ import java.util.Set; /** * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)} - * and called whenever the collection state changes. + * and called whenever there is a change in the collection state or in the list of liveNodes. + * + * @see DocCollectionWatcher */ public interface CollectionStateWatcher { /** - * Called when the collection we are registered against has a change of state. + * Called when either the collection we are registered against has a change of state or there is a change to the live nodes of our collection. * + *

* Note that, due to the way Zookeeper watchers are implemented, a single call may be * the result of several state changes. Also, multiple calls to this method can be made * with the same state, ie. without any new updates. + *

* * @param liveNodes the set of live nodes * @param collectionState the new collection state (may be null if the collection has been * deleted) - * * @return true if the watcher should be removed */ boolean onStateChanged(Set liveNodes, DocCollection collectionState); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java new file mode 100644 index 00000000000..0d65cfecc38 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollectionWatcher.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.common.cloud; + +/** + * Callback registered with {@link ZkStateReader#registerDocCollectionWatcher(String, DocCollectionWatcher)} + * and called whenever the DocCollection changes. + */ +public interface DocCollectionWatcher { + + /** + * Called when the collection we are registered against has a change of state. + * + *

+ * Note that, due to the way Zookeeper watchers are implemented, a single call may be + * the result of several state changes. Also, multiple calls to this method can be made + * with the same state, ie. without any new updates. + *

+ * + * @param collection the new collection state (may be null if the collection has been deleted) + * @return true if the watcher should be removed + */ + boolean onStateChanged(DocCollection collection); + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 3af08d8e608..cd72203eacb 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -185,7 +186,7 @@ public class ZkStateReader implements SolrCloseable { private final Runnable securityNodeListener; - private ConcurrentHashMap> collectionWatches = new ConcurrentHashMap<>(); + private ConcurrentHashMap> collectionWatches = new ConcurrentHashMap<>(); // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map. private ConcurrentHashMap> collectionPropsObservers = new ConcurrentHashMap<>(); @@ -590,7 +591,7 @@ public class ZkStateReader implements SolrCloseable { notifyCloudCollectionsListeners(); for (String collection : changedCollections) { - notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection)); + notifyStateWatchers(collection, clusterState.getCollectionOrNull(collection)); } } @@ -1598,9 +1599,46 @@ public class ZkStateReader implements SolrCloseable { } /** - * Register a CollectionStateWatcher to be called when the state of a collection changes + * Register a CollectionStateWatcher to be called when the state of a collection changes + * or the set of live nodes changes. + * + *

+ * The Watcher will automatically be removed when it's + * onStateChanged returns true + *

+ * + *

+ * This is method is just syntactic sugar for registering both a {@link DocCollectionWatcher} and + * a {@link LiveNodesListener}. Callers that only care about one or the other (but not both) are + * encouraged to use the more specific methods register methods as it may reduce the number of + * ZooKeeper watchers needed, and reduce the amount of network/cpu used. + *

+ * + * @see #registerDocCollectionWatcher + * @see #registerLiveNodesListener */ public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { + final DocCollectionAndLiveNodesWatcherWrapper wrapper + = new DocCollectionAndLiveNodesWatcherWrapper(collection, stateWatcher); + + registerDocCollectionWatcher(collection, wrapper); + registerLiveNodesListener(wrapper); + + DocCollection state = clusterState.getCollectionOrNull(collection); + if (stateWatcher.onStateChanged(liveNodes, state) == true) { + removeCollectionStateWatcher(collection, stateWatcher); + } + } + + /** + * Register a DocCollectionWatcher to be called when the state of a collection changes + * + *

+ * The Watcher will automatically be removed when it's + * onStateChanged returns true + *

+ */ + public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) { AtomicBoolean watchSet = new AtomicBoolean(false); collectionWatches.compute(collection, (k, v) -> { if (v == null) { @@ -1616,17 +1654,27 @@ public class ZkStateReader implements SolrCloseable { } DocCollection state = clusterState.getCollectionOrNull(collection); - if (stateWatcher.onStateChanged(liveNodes, state) == true) { - removeCollectionStateWatcher(collection, stateWatcher); + if (stateWatcher.onStateChanged(state) == true) { + removeDocCollectionWatcher(collection, stateWatcher); } } /** * Block until a CollectionStatePredicate returns true, or the wait times out * + *

* Note that the predicate may be called again even after it has returned true, so * implementors should avoid changing state within the predicate call itself. + *

* + *

+ * This implementation utilizes {@link CollectionStateWatcher} internally. + * Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate} + * instead + *

+ * + * @see #waitForState(String, long, TimeUnit, Predicate) + * @see #registerCollectionStateWatcher * @param collection the collection to watch * @param wait how long to wait * @param unit the units of the wait parameter @@ -1665,13 +1713,60 @@ public class ZkStateReader implements SolrCloseable { waitLatches.remove(latch); } } + + /** + * Block until a Predicate returns true, or the wait times out + * + *

+ * Note that the predicate may be called again even after it has returned true, so + * implementors should avoid changing state within the predicate call itself. + *

+ * + * @param collection the collection to watch + * @param wait how long to wait + * @param unit the units of the wait parameter + * @param predicate the predicate to call on state changes + * @throws InterruptedException on interrupt + * @throws TimeoutException on timeout + */ + public void waitForState(final String collection, long wait, TimeUnit unit, Predicate predicate) + throws InterruptedException, TimeoutException { + + if (closed) { + throw new AlreadyClosedException(); + } + + final CountDownLatch latch = new CountDownLatch(1); + waitLatches.add(latch); + AtomicReference docCollection = new AtomicReference<>(); + DocCollectionWatcher watcher = (c) -> { + docCollection.set(c); + boolean matches = predicate.test(c); + if (matches) + latch.countDown(); + + return matches; + }; + registerDocCollectionWatcher(collection, watcher); + + try { + // wait for the watcher predicate to return true, or time out + if (!latch.await(wait, unit)) + throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get()); + + } + finally { + removeDocCollectionWatcher(collection, watcher); + waitLatches.remove(latch); + } + } /** * Block until a LiveNodesStatePredicate returns true, or the wait times out - * + *

* Note that the predicate may be called again even after it has returned true, so * implementors should avoid changing state within the predicate call itself. - * + *

* @param wait how long to wait * @param unit the units of the wait parameter * @param predicate the predicate to call on state changes @@ -1713,14 +1808,35 @@ public class ZkStateReader implements SolrCloseable { /** * Remove a watcher from a collection's watch list. - * + *

* This allows Zookeeper watches to be removed if there is no interest in the * collection. + *

* + * @see #registerCollectionStateWatcher * @param collection the collection * @param watcher the watcher */ public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { + final DocCollectionAndLiveNodesWatcherWrapper wrapper + = new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher); + + removeDocCollectionWatcher(collection, wrapper); + removeLiveNodesListener(wrapper); + } + + /** + * Remove a watcher from a collection's watch list. + *

+ * This allows Zookeeper watches to be removed if there is no interest in the + * collection. + *

+ * + * @see #registerDocCollectionWatcher + * @param collection the collection + * @param watcher the watcher + */ + public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) { AtomicBoolean reconstructState = new AtomicBoolean(false); collectionWatches.compute(collection, (k, v) -> { if (v == null) @@ -1742,8 +1858,8 @@ public class ZkStateReader implements SolrCloseable { } /* package-private for testing */ - Set getStateWatchers(String collection) { - final Set watchers = new HashSet<>(); + Set getStateWatchers(String collection) { + final Set watchers = new HashSet<>(); collectionWatches.compute(collection, (k, v) -> { if (v != null) { watchers.addAll(v.stateWatchers); @@ -1845,12 +1961,12 @@ public class ZkStateReader implements SolrCloseable { } } - private void notifyStateWatchers(Set liveNodes, String collection, DocCollection collectionState) { + private void notifyStateWatchers(String collection, DocCollection collectionState) { if (this.closed) { return; } try { - notifications.submit(new Notification(liveNodes, collection, collectionState)); + notifications.submit(new Notification(collection, collectionState)); } catch (RejectedExecutionException e) { if (closed == false) { @@ -1861,29 +1977,27 @@ public class ZkStateReader implements SolrCloseable { private class Notification implements Runnable { - final Set liveNodes; final String collection; final DocCollection collectionState; - private Notification(Set liveNodes, String collection, DocCollection collectionState) { - this.liveNodes = liveNodes; + private Notification(String collection, DocCollection collectionState) { this.collection = collection; this.collectionState = collectionState; } @Override public void run() { - List watchers = new ArrayList<>(); + List watchers = new ArrayList<>(); collectionWatches.compute(collection, (k, v) -> { if (v == null) return null; watchers.addAll(v.stateWatchers); return v; }); - for (CollectionStateWatcher watcher : watchers) { + for (DocCollectionWatcher watcher : watchers) { try { - if (watcher.onStateChanged(liveNodes, collectionState)) { - removeCollectionStateWatcher(collection, watcher); + if (watcher.onStateChanged(collectionState)) { + removeDocCollectionWatcher(collection, watcher); } } catch (Exception exception) { log.warn("Error on calling watcher", exception); @@ -2118,4 +2232,54 @@ public class ZkStateReader implements SolrCloseable { } } + /** + * Helper class that acts as both a {@link DocCollectionWatcher} and a {@link LiveNodesListener} + * while wraping and delegating to a {@link CollectionStateWatcher} + */ + private final class DocCollectionAndLiveNodesWatcherWrapper implements DocCollectionWatcher, LiveNodesListener { + private final String collectionName; + private final CollectionStateWatcher delegate; + + public int hashCode() { + return collectionName.hashCode() * delegate.hashCode(); + } + + public boolean equals(Object other) { + if (other instanceof DocCollectionAndLiveNodesWatcherWrapper) { + DocCollectionAndLiveNodesWatcherWrapper that + = (DocCollectionAndLiveNodesWatcherWrapper) other; + return this.collectionName.equals(that.collectionName) + && this.delegate.equals(that.delegate); + } + return false; + } + + public DocCollectionAndLiveNodesWatcherWrapper(final String collectionName, + final CollectionStateWatcher delegate) { + this.collectionName = collectionName; + this.delegate = delegate; + } + + @Override + public boolean onStateChanged(DocCollection collectionState) { + final boolean result = delegate.onStateChanged(ZkStateReader.this.liveNodes, + collectionState); + if (result) { + // it might be a while before live nodes changes, so proactively remove ourselves + removeLiveNodesListener(this); + } + return result; + } + + @Override + public boolean onChange(SortedSet oldLiveNodes, SortedSet newLiveNodes) { + final DocCollection collection = ZkStateReader.this.clusterState.getCollectionOrNull(collectionName); + final boolean result = delegate.onStateChanged(newLiveNodes, collection); + if (result) { + // it might be a while before collection changes, so proactively remove ourselves + removeDocCollectionWatcher(collectionName, this); + } + return result; + } + } } diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java index 89cba065539..f97b537e648 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -37,20 +37,22 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** @see TestDocCollectionWatcher */ public class TestCollectionStateWatchers extends SolrCloudTestCase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final int CLUSTER_SIZE = 4; - private static final int MAX_WAIT_TIMEOUT = 30; + + private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!! private ExecutorService executor = null; @Before public void prepareCluster() throws Exception { configureCluster(CLUSTER_SIZE) - .addConfig("config", getFile("solrj/solr/collection1/conf").toPath()) - .configure(); + .addConfig("config", getFile("solrj/solr/collection1/conf").toPath()) + .configure(); executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers"); } @@ -75,6 +77,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { private void waitFor(String message, long timeout, TimeUnit unit, Callable predicate) throws InterruptedException, ExecutionException { + Future future = executor.submit(() -> { try { while (true) { @@ -100,50 +103,71 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { } @Test - //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018 - public void testSimpleCollectionWatch() throws Exception { - + public void testCollectionWatchWithShutdownOfActiveNode() throws Exception { + doTestCollectionWatchWithNodeShutdown(false); + } + + @Test + public void testCollectionWatchWithShutdownOfUnusedNode() throws Exception { + doTestCollectionWatchWithNodeShutdown(true); + } + + private void doTestCollectionWatchWithNodeShutdown(final boolean shutdownUnusedNode) + throws Exception { + CloudSolrClient client = cluster.getSolrClient(); - CollectionAdminRequest.createCollection("testcollection", "config", 4, 1) - .processAndWait(client, MAX_WAIT_TIMEOUT); + + // note: one node in our cluster is unsed by collection + CollectionAdminRequest.createCollection("testcollection", "config", CLUSTER_SIZE, 1) + .processAndWait(client, MAX_WAIT_TIMEOUT); client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, - (n, c) -> DocCollection.isFullyActive(n, c, 4, 1)); + (n, c) -> DocCollection.isFullyActive(n, c, CLUSTER_SIZE, 1)); + final JettySolrRunner extraJetty = cluster.startJettySolrRunner(); + final JettySolrRunner jettyToShutdown + = shutdownUnusedNode ? extraJetty : cluster.getJettySolrRunners().get(0); + final int expectedNodesWithActiveReplicas = CLUSTER_SIZE - (shutdownUnusedNode ? 0 : 1); + + cluster.waitForAllNodes(MAX_WAIT_TIMEOUT); + // shutdown a node and check that we get notified about the change final CountDownLatch latch = new CountDownLatch(1); client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> { - int nodeCount = 0; + int nodesWithActiveReplicas = 0; log.info("State changed: {}", collectionState); for (Slice slice : collectionState) { for (Replica replica : slice) { if (replica.isActive(liveNodes)) - nodeCount++; + nodesWithActiveReplicas++; } } - if (nodeCount == 3) { + if (liveNodes.size() == CLUSTER_SIZE + && expectedNodesWithActiveReplicas == nodesWithActiveReplicas) { latch.countDown(); return true; } return false; }); - JettySolrRunner j = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); - cluster.waitForJettyToStop(j); - assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + cluster.stopJettySolrRunner(jettyToShutdown); + cluster.waitForJettyToStop(jettyToShutdown); + + assertTrue("CollectionStateWatcher was never notified of cluster change", + latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); - waitFor("CollectionStateWatcher wasn't cleared after completion", 1, TimeUnit.SECONDS, - () -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty()); + waitFor("CollectionStateWatcher wasn't cleared after completion", + MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty()); } @Test - // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018 public void testStateWatcherChecksCurrentStateOnRegister() throws Exception { CloudSolrClient client = cluster.getSolrClient(); CollectionAdminRequest.createCollection("currentstate", "config", 1, 1) - .processAndWait(client, MAX_WAIT_TIMEOUT); + .processAndWait(client, MAX_WAIT_TIMEOUT); final CountDownLatch latch = new CountDownLatch(1); client.registerCollectionStateWatcher("currentstate", (n, c) -> { @@ -151,9 +175,10 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { return false; }); - assertTrue("CollectionStateWatcher isn't called on new registration", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + assertTrue("CollectionStateWatcher isn't called on new registration", + latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); assertEquals("CollectionStateWatcher should be retained", - 1, client.getZkStateReader().getStateWatchers("currentstate").size()); + 1, client.getZkStateReader().getStateWatchers("currentstate").size()); final CountDownLatch latch2 = new CountDownLatch(1); client.registerCollectionStateWatcher("currentstate", (n, c) -> { @@ -162,9 +187,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { }); assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection", - latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); - waitFor("CollectionStateWatcher should be removed", 1, TimeUnit.SECONDS, - () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1); + latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1); } @Test @@ -172,17 +197,17 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { CloudSolrClient client = cluster.getSolrClient(); CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1) - .processAndWait(client, MAX_WAIT_TIMEOUT); + .processAndWait(client, MAX_WAIT_TIMEOUT); client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, - (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); // several goes, to check that we're not getting delayed state changes for (int i = 0; i < 10; i++) { try { - client.waitForState("waitforstate", 1, TimeUnit.SECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); - } - catch (TimeoutException e) { + client.waitForState("waitforstate", 1, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + } catch (TimeoutException e) { fail("waitForState should return immediately if the predicate is already satisfied"); } } @@ -190,114 +215,145 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { } @Test - // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018 - @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018 public void testCanWaitForNonexistantCollection() throws Exception { Future future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, - (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); CollectionAdminRequest.createCollection("delayed", "config", 1, 1) - .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT); + .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT); assertTrue("waitForState was not triggered by collection creation", future.get()); } @Test - // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018 public void testPredicateFailureTimesOut() throws Exception { CloudSolrClient client = cluster.getSolrClient(); expectThrows(TimeoutException.class, () -> { - client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false)); + client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, + ((liveNodes, collectionState) -> false)); }); - waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS, - () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty()); + waitFor("Watchers for collection should be removed after timeout", + MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty()); } @Test - //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018 public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception { CloudSolrClient client = cluster.getSolrClient(); CollectionAdminRequest.createCollection("falsepredicate", "config", 4, 1) - .processAndWait(client, MAX_WAIT_TIMEOUT); + .processAndWait(client, MAX_WAIT_TIMEOUT); client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, - (n, c) -> DocCollection.isFullyActive(n, c, 4, 1)); + (n, c) -> DocCollection.isFullyActive(n, c, 4, 1)); final CountDownLatch firstCall = new CountDownLatch(1); // stop a node, then add a watch waiting for all nodes to be back up - JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); + JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt + (cluster.getJettySolrRunners().size())); cluster.waitForJettyToStop(node1); - Future future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (liveNodes, collectionState) -> { + Future future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (liveNodes, collectionState) -> { firstCall.countDown(); return DocCollection.isFullyActive(liveNodes, collectionState, 4, 1); }); // first, stop another node; the watch should not be fired after this! - JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); + JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt + (cluster.getJettySolrRunners().size())); // now start them both back up cluster.startJettySolrRunner(node1); - assertTrue("CollectionStateWatcher not called after 30 seconds", firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + assertTrue("CollectionStateWatcher not called", + firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); cluster.startJettySolrRunner(node2); Boolean result = future.get(); - assertTrue("Did not see a fully active cluster after 30 seconds", result); + assertTrue("Did not see a fully active cluster", result); } @Test - // commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018 public void testWatcherIsRemovedAfterTimeout() throws Exception { CloudSolrClient client = cluster.getSolrClient(); assertTrue("There should be no watchers for a non-existent collection!", - client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); + client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); expectThrows(TimeoutException.class, () -> { - client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); }); - waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS, - () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); + waitFor("Watchers for collection should be removed after timeout", + MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); } @Test - // commented 20-July-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018 public void testDeletionsTriggerWatches() throws Exception { CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1) - .process(cluster.getSolrClient()); - Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (l, c) -> c == null); + .process(cluster.getSolrClient()); + + Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (l, c) -> c == null); CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient()); - assertTrue("CollectionStateWatcher not notified of delete call after 30 seconds", future.get()); + assertTrue("CollectionStateWatcher not notified of delete call", future.get()); + } + + @Test + public void testLiveNodeChangesTriggerWatches() throws Exception { + final CloudSolrClient client = cluster.getSolrClient(); + + CollectionAdminRequest.createCollection("test_collection", "config", 1, 1).process(client); + + Future future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (l, c) -> (l.size() == 1 + CLUSTER_SIZE)); + + JettySolrRunner unusedJetty = cluster.startJettySolrRunner(); + assertTrue("CollectionStateWatcher not notified of new node", future.get()); + + waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0); + + future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (l, c) -> (l.size() == CLUSTER_SIZE)); + + cluster.stopJettySolrRunner(unusedJetty); + + assertTrue("CollectionStateWatcher not notified of node lost", future.get()); + + waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0); + } @Test - //Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018 public void testWatchesWorkForStateFormat1() throws Exception { final CloudSolrClient client = cluster.getSolrClient(); Future future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, - (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1) - .processAndWait(client, MAX_WAIT_TIMEOUT); - assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", future.get()); + .processAndWait(client, MAX_WAIT_TIMEOUT); + assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", + future.get()); - Future migrated - = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, - (n, c) -> c != null && c.getStateFormat() == 2); + Future migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> c != null && c.getStateFormat() == 2); - CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT); + CollectionAdminRequest.migrateCollectionFormat("stateformat1") + .processAndWait(client, MAX_WAIT_TIMEOUT); assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get()); } diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java new file mode 100644 index 00000000000..f024a1c929c --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.common.cloud; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; + +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.util.ExecutorUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** @see TestCollectionStateWatchers */ +public class TestDocCollectionWatcher extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int CLUSTER_SIZE = 4; + + private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!! + + private ExecutorService executor = null; + + @Before + public void prepareCluster() throws Exception { + configureCluster(CLUSTER_SIZE) + .addConfig("config", getFile("solrj/solr/collection1/conf").toPath()) + .configure(); + executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers"); + } + + @After + public void tearDownCluster() throws Exception { + executor.shutdown(); + shutdownCluster(); + executor = null; + } + + private Future waitInBackground(String collection, long timeout, TimeUnit unit, + Predicate predicate) { + return executor.submit(() -> { + try { + cluster.getSolrClient().waitForState(collection, timeout, unit, predicate); + } catch (InterruptedException | TimeoutException e) { + return Boolean.FALSE; + } + return Boolean.TRUE; + }); + } + + private void waitFor(String message, long timeout, TimeUnit unit, Callable predicate) + throws InterruptedException, ExecutionException { + + Future future = executor.submit(() -> { + try { + while (true) { + if (predicate.call()) + return true; + TimeUnit.MILLISECONDS.sleep(10); + } + } + catch (InterruptedException e) { + return false; + } + }); + try { + if (future.get(timeout, unit) == true) { + return; + } + } + catch (TimeoutException e) { + // pass failure message on + } + future.cancel(true); + fail(message); + } + + @Test + public void testStateWatcherChecksCurrentStateOnRegister() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + CollectionAdminRequest.createCollection("currentstate", "config", 1, 1) + .processAndWait(client, MAX_WAIT_TIMEOUT); + + final CountDownLatch latch = new CountDownLatch(1); + client.registerDocCollectionWatcher("currentstate", (c) -> { + latch.countDown(); + return false; + }); + + assertTrue("DocCollectionWatcher isn't called on new registration", + latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + assertEquals("DocCollectionWatcher should be retained", + 1, client.getZkStateReader().getStateWatchers("currentstate").size()); + + final CountDownLatch latch2 = new CountDownLatch(1); + client.registerDocCollectionWatcher("currentstate", (c) -> { + latch2.countDown(); + return true; + }); + + assertTrue("DocCollectionWatcher isn't called when registering for already-watched collection", + latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1); + } + + @Test + public void testWaitForStateChecksCurrentState() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1) + .processAndWait(client, MAX_WAIT_TIMEOUT); + + client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + + // several goes, to check that we're not getting delayed state changes + for (int i = 0; i < 10; i++) { + try { + client.waitForState("waitforstate", 1, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + } catch (TimeoutException e) { + fail("waitForState should return immediately if the predicate is already satisfied"); + } + } + + } + + @Test + public void testCanWaitForNonexistantCollection() throws Exception { + + Future future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (c) -> (null != c)); + + CollectionAdminRequest.createCollection("delayed", "config", 1, 1) + .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT); + + assertTrue("waitForState was not triggered by collection creation", future.get()); + + } + + @Test + public void testPredicateFailureTimesOut() throws Exception { + CloudSolrClient client = cluster.getSolrClient(); + expectThrows(TimeoutException.class, () -> { + client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, + ((liveNodes, collectionState) -> false)); + }); + waitFor("Watchers for collection should be removed after timeout", + MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty()); + + } + + @Test + public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + CollectionAdminRequest.createCollection("falsepredicate", "config", 1, 1) + .processAndWait(client, MAX_WAIT_TIMEOUT); + + // create collection with 1 shard 1 replica... + client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + + // set watcher waiting for at least 3 replicas (will fail initially) + final AtomicInteger runCount = new AtomicInteger(0); + final Future future = waitInBackground + ("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (collectionState) -> { + runCount.incrementAndGet(); + int replicas = 0; + for (Slice slice : collectionState) { + for (Replica replica : slice) { + replicas++; + } + } + return 3 <= replicas; + }); + + // add a 2nd replica... + CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1") + .processAndWait(client, MAX_WAIT_TIMEOUT); + client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 2)); + + // confirm watcher has run at least once and has been retained... + final int runCountSnapshot = runCount.get(); + assertTrue(0 < runCountSnapshot); + assertEquals(1, client.getZkStateReader().getStateWatchers("falsepredicate").size()); + + // now add a 3rd replica... + CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1") + .processAndWait(client, MAX_WAIT_TIMEOUT); + + // now confirm watcher is invoked & removed + assertTrue("watcher never succeeded", future.get()); + assertTrue(runCountSnapshot < runCount.get()); + waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("falsepredicate").size() == 0); + + } + + @Test + public void testWatcherIsRemovedAfterTimeout() throws Exception { + CloudSolrClient client = cluster.getSolrClient(); + assertTrue("There should be no watchers for a non-existent collection!", + client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); + + expectThrows(TimeoutException.class, () -> { + client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, + (c) -> (false)); + }); + + waitFor("Watchers for collection should be removed after timeout", + MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); + + } + + @Test + public void testDeletionsTriggerWatches() throws Exception { + final CloudSolrClient client = cluster.getSolrClient(); + CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1).process(client); + + client.waitForState("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + + Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (c) -> c == null); + + CollectionAdminRequest.deleteCollection("tobedeleted").process(client); + + assertTrue("DocCollectionWatcher not notified of delete call", future.get()); + } + + @Test + public void testWatchesWorkForStateFormat1() throws Exception { + + final CloudSolrClient client = cluster.getSolrClient(); + + Future future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (c) -> (null != c) ); + + CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1) + .processAndWait(client, MAX_WAIT_TIMEOUT); + client.waitForState("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + + assertTrue("DocCollectionWatcher not notified of stateformat=1 collection creation", + future.get()); + + Future migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (c) -> c != null && c.getStateFormat() == 2); + + CollectionAdminRequest.migrateCollectionFormat("stateformat1") + .processAndWait(client, MAX_WAIT_TIMEOUT); + assertTrue("DocCollectionWatcher did not persist over state format migration", migrated.get()); + + } + +} diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java index eb2fd266ece..c9332bbc526 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java @@ -205,7 +205,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes throws Exception { log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds); - zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (liveNodes, docCollection) -> { + zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> { if (docCollection == null) return true; return false; @@ -237,7 +237,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes Thread.sleep(100); } - zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (liveNodes, docCollection) -> { + zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection) -> { if (docCollection == null) return false; @@ -253,7 +253,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, Replica.State expectedState) throws InterruptedException, TimeoutException { reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS, - (liveNodes, collectionState) -> collectionState != null && collectionState.getSlice(shard) != null + (collectionState) -> collectionState != null && collectionState.getSlice(shard) != null && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState); } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index cdb6f68a1b4..ff87246b362 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -405,7 +405,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes .setCreateNodeSet("") .process(cloudClient).getStatus()); - cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (l,c) -> c != null && c.getSlices().size() == sliceCount); + cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (c) -> c != null && c.getSlices().size() == sliceCount); ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool")); @@ -585,7 +585,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes protected void waitForActiveReplicaCount(CloudSolrClient client, String collection, int expectedNumReplicas) throws TimeoutException, NotInClusterStateException { AtomicInteger nReplicas = new AtomicInteger(); try { - client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (n, c) -> { + client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (c) -> { if (c == null) return false; int numReplicas = getTotalReplicas(c, c.getName()); diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java index 5faa30715e8..a97832bf5c3 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java @@ -553,7 +553,7 @@ public class MiniSolrCloudCluster { } for (String collection : reader.getClusterState().getCollectionStates().keySet()) { - reader.waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null ? true : false); + reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState) -> collectionState == null ? true : false); } }