SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and CloudSolrClient to be triggered on liveNode changes.

Also add Predicate<DocCollection> equivilents for callers that don't care about liveNodes.

(cherry picked from commit 5a974860fa83408a86ca64b417f3111b037da7eb)
This commit is contained in:
Chris Hostetter 2019-06-17 09:59:43 -07:00
parent a7af74f0c4
commit 2f2333a781
19 changed files with 889 additions and 112 deletions

View File

@ -122,6 +122,10 @@ Bug Fixes
* SOLR-13333: unleashing terms.ttf from terms.list when distrib=false (Munendra S N via Mikhail Khludnev) * SOLR-13333: unleashing terms.ttf from terms.list when distrib=false (Munendra S N via Mikhail Khludnev)
* SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and
CloudSolrClient to be triggered on liveNode changes. Also add Predicate<DocCollection> equivilents
for callers that don't care about liveNodes. (hossman)
Other Changes Other Changes
---------------------- ----------------------

View File

@ -68,12 +68,12 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect; import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.ConnectionManager; import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DefaultConnectionStrategy; import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DefaultZkACLProvider; import org.apache.solr.common.cloud.DefaultZkACLProvider;
import org.apache.solr.common.cloud.DefaultZkCredentialsProvider; import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.LiveNodesListener; import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.OnReconnect; import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
@ -1052,7 +1052,7 @@ public class ZkController implements Closeable {
CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size()); CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size());
for (String collectionWithLocalReplica : collectionsWithLocalReplica) { for (String collectionWithLocalReplica : collectionsWithLocalReplica) {
zkStateReader.registerCollectionStateWatcher(collectionWithLocalReplica, (liveNodes, collectionState) -> { zkStateReader.registerDocCollectionWatcher(collectionWithLocalReplica, (collectionState) -> {
if (collectionState == null) return false; if (collectionState == null) return false;
boolean foundStates = true; boolean foundStates = true;
for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) { for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) {
@ -1194,7 +1194,7 @@ public class ZkController implements Closeable {
// check replica's existence in clusterstate first // check replica's existence in clusterstate first
try { try {
zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100, zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null); TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
} catch (TimeoutException e) { } catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate"); throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
} }
@ -1301,7 +1301,7 @@ public class ZkController implements Closeable {
// make sure we have an update cluster state right away // make sure we have an update cluster state right away
zkStateReader.forceUpdateCollection(collection); zkStateReader.forceUpdateCollection(collection);
// the watcher is added to a set so multiple calls of this method will left only one watcher // the watcher is added to a set so multiple calls of this method will left only one watcher
zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(), zkStateReader.registerDocCollectionWatcher(cloudDesc.getCollectionName(),
new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName())); new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
return shardId; return shardId;
} finally { } finally {
@ -1850,7 +1850,7 @@ public class ZkController implements Closeable {
AtomicReference<String> errorMessage = new AtomicReference<>(); AtomicReference<String> errorMessage = new AtomicReference<>();
AtomicReference<DocCollection> collectionState = new AtomicReference<>(); AtomicReference<DocCollection> collectionState = new AtomicReference<>();
try { try {
zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (n, c) -> { zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
collectionState.set(c); collectionState.set(c);
if (c == null) if (c == null)
return false; return false;
@ -2550,7 +2550,7 @@ public class ZkController implements Closeable {
}; };
} }
private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher { private class UnloadCoreOnDeletedWatcher implements DocCollectionWatcher {
String coreNodeName; String coreNodeName;
String shard; String shard;
String coreName; String coreName;
@ -2563,7 +2563,7 @@ public class ZkController implements Closeable {
@Override @Override
// synchronized due to SOLR-11535 // synchronized due to SOLR-11535
public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) { public synchronized boolean onStateChanged(DocCollection collectionState) {
if (getCoreContainer().getCoreDescriptor(coreName) == null) return true; if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null; boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;

View File

@ -321,7 +321,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
CollectionAdminParams.COLOCATED_WITH, collectionName); CollectionAdminParams.COLOCATED_WITH, collectionName);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props)); ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
try { try {
zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH))); zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
} catch (TimeoutException e) { } catch (TimeoutException e) {
log.warn("Timed out waiting to see the " + COLOCATED_WITH + " property set on collection: " + withCollection); log.warn("Timed out waiting to see the " + COLOCATED_WITH + " property set on collection: " + withCollection);
// maybe the overseer queue is backed up, we don't want to fail the create request // maybe the overseer queue is backed up, we don't want to fail the create request

View File

@ -134,7 +134,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ocmh.overseer.offerStateUpdate(Utils.toJSON(m)); ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
// wait for a while until we don't see the collection // wait for a while until we don't see the collection
zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null); zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null);
// we can delete any remaining unique aliases // we can delete any remaining unique aliases
if (!aliasReferences.isEmpty()) { if (!aliasReferences.isEmpty()) {

View File

@ -134,7 +134,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader zkStateReader = ocmh.zkStateReader; ZkStateReader zkStateReader = ocmh.zkStateReader;
ocmh.overseer.offerStateUpdate(Utils.toJSON(m)); ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (l, c) -> c.getSlice(sliceId) == null); zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId); log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
} catch (SolrException e) { } catch (SolrException e) {

View File

@ -416,7 +416,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException { boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
try { try {
zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (n, c) -> { zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> {
if (c == null) if (c == null)
return true; return true;
Slice slice = c.getSlice(shard); Slice slice = c.getSlice(shard);

View File

@ -35,8 +35,8 @@ import org.apache.solr.client.solrj.request.CoreStatus;
import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkNodeProps;
@ -115,7 +115,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica); JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader()); ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName); Set<DocCollectionWatcher> watchers = accessor.getStateWatchers(collectionName);
CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName()) CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
.process(cluster.getSolrClient()); .process(cluster.getSolrClient());
waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> { waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> {
@ -221,7 +221,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
Replica replica = getRandomReplica(shard); Replica replica = getRandomReplica(shard);
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica); JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader()); ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName); Set<DocCollectionWatcher> watchers = accessor.getStateWatchers(collectionName);
ZkNodeProps m = new ZkNodeProps( ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.util.DefaultSolrThreadFactory;
import static org.apache.solr.cloud.SolrCloudTestCase.clusterShape;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public void testWaitForStateAfterShutDown() throws Exception {
final String col_name = "test_col";
final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster
(1, createTempDir(), JettyConfig.builder().build());
try {
log.info("Create our collection");
CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
log.info("Sanity check that our collection has come online");
cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS, clusterShape(1, 1));
log.info("Shutdown 1 node");
final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
nodeToStop.stop();
log.info("Wait to confirm our node is fully shutdown");
cluster.waitForJettyToStop(nodeToStop);
// now that we're confident that node has stoped, check if a waitForState
// call will detect the missing replica -- shouldn't need long wait times (we know it's down)...
log.info("Now check if waitForState will recognize we already have the exepcted state");
cluster.getSolrClient().waitForState(col_name, 500, TimeUnit.MILLISECONDS, clusterShape(1, 0));
} finally {
cluster.shutdown();
}
}
public void testWaitForStateBeforeShutDown() throws Exception {
final String col_name = "test_col";
final ExecutorService executor = ExecutorUtil.newMDCAwareFixedThreadPool
(1, new DefaultSolrThreadFactory("background_executor"));
final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster
(1, createTempDir(), JettyConfig.builder().build());
try {
log.info("Create our collection");
CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
log.info("Sanity check that our collection has come online");
cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS,
SolrCloudTestCase.clusterShape(1, 1));
// HACK implementation detail...
//
// we know that in the current implementation, waitForState invokes the predicate twice
// independently of the current state of the collection and/or wether the predicate succeeds.
// If this implementation detail changes, (ie: so that it's only invoked once)
// then this number needs to change -- but the test fundementally depends on the implementation
// calling the predicate at least once, which should also be neccessary for any future impl
// (to verify that it didn't "miss" the state change when creating the watcher)
final CountDownLatch latch = new CountDownLatch(2);
final Future<?> backgroundWaitForState = executor.submit
(() -> {
try {
cluster.getSolrClient().waitForState(col_name, 180, TimeUnit.SECONDS,
new LatchCountingPredicateWrapper(latch,
clusterShape(1, 0)));
} catch (Exception e) {
log.error("background thread got exception", e);
throw new RuntimeException(e);
}
return;
}, null);
log.info("Awaiting latch...");
if (! latch.await(120, TimeUnit.SECONDS)) {
fail("timed out Waiting a ridiculous amount of time for the waitForState latch -- did impl change?");
}
log.info("Shutdown 1 node");
final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
nodeToStop.stop();
log.info("Wait to confirm our node is fully shutdown");
cluster.waitForJettyToStop(nodeToStop);
// now that we're confident that node has stoped, check if a waitForState
// call will detect the missing replica -- shouldn't need long wait times...
log.info("Checking Future result to see if waitForState finished successfully");
try {
backgroundWaitForState.get();
} catch (ExecutionException e) {
log.error("background waitForState exception", e);
throw e;
}
} finally {
ExecutorUtil.shutdownAndAwaitTermination(executor);
cluster.shutdown();
}
}
public final class LatchCountingPredicateWrapper implements CollectionStatePredicate {
private final CountDownLatch latch;
private final CollectionStatePredicate inner;
public LatchCountingPredicateWrapper(final CountDownLatch latch, final CollectionStatePredicate inner) {
this.latch = latch;
this.inner = inner;
}
public boolean matches(Set<String> liveNodes, DocCollection collectionState) {
final boolean result = inner.matches(liveNodes, collectionState);
log.info("Predicate called: result={}, (pre)latch={}, liveNodes={}, state={}",
result, latch.getCount(), liveNodes, collectionState);
latch.countDown();
return result;
}
}
}

View File

@ -28,7 +28,7 @@ public class ZkStateReaderAccessor {
this.zkStateReader = zkStateReader; this.zkStateReader = zkStateReader;
} }
public Set<CollectionStateWatcher> getStateWatchers(String collection) { public Set<DocCollectionWatcher> getStateWatchers(String collection) {
return zkStateReader.getStateWatchers(collection); return zkStateReader.getStateWatchers(collection);
} }

View File

@ -42,6 +42,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.ResponseParser;
@ -62,6 +63,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStatePredicate; import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Replica;
@ -362,11 +364,21 @@ public abstract class BaseCloudSolrClient extends SolrClient {
} }
/** /**
* Block until a collection state matches a predicate, or a timeout * Block until a CollectionStatePredicate returns true, or the wait times out
* *
* <p>
* Note that the predicate may be called again even after it has returned true, so * Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself. * implementors should avoid changing state within the predicate call itself.
* </p>
* *
* <p>
* This implementation utilizes {@link CollectionStateWatcher} internally.
* Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate}
* instead
* </p>
*
* @see #waitForState(String, long, TimeUnit, Predicate)
* @see #registerCollectionStateWatcher
* @param collection the collection to watch * @param collection the collection to watch
* @param wait how long to wait * @param wait how long to wait
* @param unit the units of the wait parameter * @param unit the units of the wait parameter
@ -379,14 +391,45 @@ public abstract class BaseCloudSolrClient extends SolrClient {
getClusterStateProvider().connect(); getClusterStateProvider().connect();
assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate); assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
} }
/**
* Block until a Predicate returns true, or the wait times out
*
* <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* </p>
*
* @see #registerDocCollectionWatcher
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate a {@link Predicate} to test against the {@link DocCollection}
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
*/
public void waitForState(String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
throws InterruptedException, TimeoutException {
getClusterStateProvider().connect();
assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
}
/** /**
* Register a CollectionStateWatcher to be called when the cluster state for a collection changes * Register a CollectionStateWatcher to be called when the cluster state for a collection changes
* <em>or</em> the set of live nodes changes.
* *
* Note that the watcher is unregistered after it has been called once. To make a watcher persistent, * <p>
* it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} * The Watcher will automatically be removed when it's
* call * <code>onStateChanged</code> returns <code>true</code>
* </p>
* *
* <p>
* This implementation utilizes {@link ZkStateReader#registerCollectionStateWatcher} internally.
* Callers that don't care about liveNodes are encouraged to use a {@link DocCollectionWatcher}
* instead
* </p>
*
* @see #registerDocCollectionWatcher(String, DocCollectionWatcher)
* @see ZkStateReader#registerCollectionStateWatcher
* @param collection the collection to watch * @param collection the collection to watch
* @param watcher a watcher that will be called when the state changes * @param watcher a watcher that will be called when the state changes
*/ */
@ -395,6 +438,23 @@ public abstract class BaseCloudSolrClient extends SolrClient {
assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher); assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
} }
/**
* Register a DocCollectionWatcher to be called when the cluster state for a collection changes.
*
* <p>
* The Watcher will automatically be removed when it's
* <code>onStateChanged</code> returns <code>true</code>
* </p>
*
* @see ZkStateReader#registerDocCollectionWatcher
* @param collection the collection to watch
* @param watcher a watcher that will be called when the state changes
*/
public void registerDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
getClusterStateProvider().connect();
assertZKStateProvider().zkStateReader.registerDocCollectionWatcher(collection, watcher);
}
private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException { private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
UpdateRequest updateRequest = (UpdateRequest) request; UpdateRequest updateRequest = (UpdateRequest) request;
SolrParams params = request.getParams(); SolrParams params = request.getParams();

View File

@ -19,23 +19,27 @@ package org.apache.solr.common.cloud;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/** /**
* Interface to determine if a collection state matches a required state * Interface to determine if a set of liveNodes and a collection's state matches some expecatations.
* *
* @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate) * @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate)
* @see ZkStateReader#waitForState(String, long, TimeUnit, Predicate)
*/ */
public interface CollectionStatePredicate { public interface CollectionStatePredicate {
/** /**
* Check the collection state matches a required state * Check if the set of liveNodes <em>and</em> the collection state matches a required state
* * <p>
* Note that both liveNodes and collectionState should be consulted to determine * Note that both liveNodes and collectionState should be consulted to determine
* the overall state. * the overall state.
* </p>
* *
* @param liveNodes the current set of live nodes * @param liveNodes the current set of live nodes
* @param collectionState the latest collection state, or null if the collection * @param collectionState the latest collection state, or null if the collection
* does not exist * does not exist
* @return true if the input matches the requirements of this predicate
*/ */
boolean matches(Set<String> liveNodes, DocCollection collectionState); boolean matches(Set<String> liveNodes, DocCollection collectionState);

View File

@ -21,21 +21,24 @@ import java.util.Set;
/** /**
* Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)} * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)}
* and called whenever the collection state changes. * and called whenever there is a change in the collection state <em>or</em> in the list of liveNodes.
*
* @see DocCollectionWatcher
*/ */
public interface CollectionStateWatcher { public interface CollectionStateWatcher {
/** /**
* Called when the collection we are registered against has a change of state. * Called when either the collection we are registered against has a change of state <em>or</em> there is a change to the live nodes of our collection.
* *
* <p>
* Note that, due to the way Zookeeper watchers are implemented, a single call may be * Note that, due to the way Zookeeper watchers are implemented, a single call may be
* the result of several state changes. Also, multiple calls to this method can be made * the result of several state changes. Also, multiple calls to this method can be made
* with the same state, ie. without any new updates. * with the same state, ie. without any new updates.
* </p>
* *
* @param liveNodes the set of live nodes * @param liveNodes the set of live nodes
* @param collectionState the new collection state (may be null if the collection has been * @param collectionState the new collection state (may be null if the collection has been
* deleted) * deleted)
*
* @return true if the watcher should be removed * @return true if the watcher should be removed
*/ */
boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState); boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState);

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common.cloud;
/**
* Callback registered with {@link ZkStateReader#registerDocCollectionWatcher(String, DocCollectionWatcher)}
* and called whenever the DocCollection changes.
*/
public interface DocCollectionWatcher {
/**
* Called when the collection we are registered against has a change of state.
*
* <p>
* Note that, due to the way Zookeeper watchers are implemented, a single call may be
* the result of several state changes. Also, multiple calls to this method can be made
* with the same state, ie. without any new updates.
* </p>
*
* @param collection the new collection state (may be null if the collection has been deleted)
* @return true if the watcher should be removed
*/
boolean onStateChanged(DocCollection collection);
}

View File

@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.UnaryOperator; import java.util.function.UnaryOperator;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -185,7 +186,7 @@ public class ZkStateReader implements SolrCloseable {
private final Runnable securityNodeListener; private final Runnable securityNodeListener;
private ConcurrentHashMap<String, CollectionWatch<CollectionStateWatcher>> collectionWatches = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>();
// named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map. // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>(); private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
@ -590,7 +591,7 @@ public class ZkStateReader implements SolrCloseable {
notifyCloudCollectionsListeners(); notifyCloudCollectionsListeners();
for (String collection : changedCollections) { for (String collection : changedCollections) {
notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection)); notifyStateWatchers(collection, clusterState.getCollectionOrNull(collection));
} }
} }
@ -1599,8 +1600,45 @@ public class ZkStateReader implements SolrCloseable {
/** /**
* Register a CollectionStateWatcher to be called when the state of a collection changes * Register a CollectionStateWatcher to be called when the state of a collection changes
* <em>or</em> the set of live nodes changes.
*
* <p>
* The Watcher will automatically be removed when it's
* <code>onStateChanged</code> returns <code>true</code>
* </p>
*
* <p>
* This is method is just syntactic sugar for registering both a {@link DocCollectionWatcher} and
* a {@link LiveNodesListener}. Callers that only care about one or the other (but not both) are
* encouraged to use the more specific methods register methods as it may reduce the number of
* ZooKeeper watchers needed, and reduce the amount of network/cpu used.
* </p>
*
* @see #registerDocCollectionWatcher
* @see #registerLiveNodesListener
*/ */
public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
final DocCollectionAndLiveNodesWatcherWrapper wrapper
= new DocCollectionAndLiveNodesWatcherWrapper(collection, stateWatcher);
registerDocCollectionWatcher(collection, wrapper);
registerLiveNodesListener(wrapper);
DocCollection state = clusterState.getCollectionOrNull(collection);
if (stateWatcher.onStateChanged(liveNodes, state) == true) {
removeCollectionStateWatcher(collection, stateWatcher);
}
}
/**
* Register a DocCollectionWatcher to be called when the state of a collection changes
*
* <p>
* The Watcher will automatically be removed when it's
* <code>onStateChanged</code> returns <code>true</code>
* </p>
*/
public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false); AtomicBoolean watchSet = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> { collectionWatches.compute(collection, (k, v) -> {
if (v == null) { if (v == null) {
@ -1616,17 +1654,27 @@ public class ZkStateReader implements SolrCloseable {
} }
DocCollection state = clusterState.getCollectionOrNull(collection); DocCollection state = clusterState.getCollectionOrNull(collection);
if (stateWatcher.onStateChanged(liveNodes, state) == true) { if (stateWatcher.onStateChanged(state) == true) {
removeCollectionStateWatcher(collection, stateWatcher); removeDocCollectionWatcher(collection, stateWatcher);
} }
} }
/** /**
* Block until a CollectionStatePredicate returns true, or the wait times out * Block until a CollectionStatePredicate returns true, or the wait times out
* *
* <p>
* Note that the predicate may be called again even after it has returned true, so * Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself. * implementors should avoid changing state within the predicate call itself.
* </p>
* *
* <p>
* This implementation utilizes {@link CollectionStateWatcher} internally.
* Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate}
* instead
* </p>
*
* @see #waitForState(String, long, TimeUnit, Predicate)
* @see #registerCollectionStateWatcher
* @param collection the collection to watch * @param collection the collection to watch
* @param wait how long to wait * @param wait how long to wait
* @param unit the units of the wait parameter * @param unit the units of the wait parameter
@ -1667,11 +1715,58 @@ public class ZkStateReader implements SolrCloseable {
} }
/** /**
* Block until a LiveNodesStatePredicate returns true, or the wait times out * Block until a Predicate returns true, or the wait times out
* *
* <p>
* Note that the predicate may be called again even after it has returned true, so * Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself. * implementors should avoid changing state within the predicate call itself.
* </p>
* *
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
*/
public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
throws InterruptedException, TimeoutException {
if (closed) {
throw new AlreadyClosedException();
}
final CountDownLatch latch = new CountDownLatch(1);
waitLatches.add(latch);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
DocCollectionWatcher watcher = (c) -> {
docCollection.set(c);
boolean matches = predicate.test(c);
if (matches)
latch.countDown();
return matches;
};
registerDocCollectionWatcher(collection, watcher);
try {
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit))
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
}
finally {
removeDocCollectionWatcher(collection, watcher);
waitLatches.remove(latch);
}
}
/**
* Block until a LiveNodesStatePredicate returns true, or the wait times out
* <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* </p>
* @param wait how long to wait * @param wait how long to wait
* @param unit the units of the wait parameter * @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes * @param predicate the predicate to call on state changes
@ -1713,14 +1808,35 @@ public class ZkStateReader implements SolrCloseable {
/** /**
* Remove a watcher from a collection's watch list. * Remove a watcher from a collection's watch list.
* * <p>
* This allows Zookeeper watches to be removed if there is no interest in the * This allows Zookeeper watches to be removed if there is no interest in the
* collection. * collection.
* </p>
* *
* @see #registerCollectionStateWatcher
* @param collection the collection * @param collection the collection
* @param watcher the watcher * @param watcher the watcher
*/ */
public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
final DocCollectionAndLiveNodesWatcherWrapper wrapper
= new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher);
removeDocCollectionWatcher(collection, wrapper);
removeLiveNodesListener(wrapper);
}
/**
* Remove a watcher from a collection's watch list.
* <p>
* This allows Zookeeper watches to be removed if there is no interest in the
* collection.
* </p>
*
* @see #registerDocCollectionWatcher
* @param collection the collection
* @param watcher the watcher
*/
public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
AtomicBoolean reconstructState = new AtomicBoolean(false); AtomicBoolean reconstructState = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> { collectionWatches.compute(collection, (k, v) -> {
if (v == null) if (v == null)
@ -1742,8 +1858,8 @@ public class ZkStateReader implements SolrCloseable {
} }
/* package-private for testing */ /* package-private for testing */
Set<CollectionStateWatcher> getStateWatchers(String collection) { Set<DocCollectionWatcher> getStateWatchers(String collection) {
final Set<CollectionStateWatcher> watchers = new HashSet<>(); final Set<DocCollectionWatcher> watchers = new HashSet<>();
collectionWatches.compute(collection, (k, v) -> { collectionWatches.compute(collection, (k, v) -> {
if (v != null) { if (v != null) {
watchers.addAll(v.stateWatchers); watchers.addAll(v.stateWatchers);
@ -1845,12 +1961,12 @@ public class ZkStateReader implements SolrCloseable {
} }
} }
private void notifyStateWatchers(Set<String> liveNodes, String collection, DocCollection collectionState) { private void notifyStateWatchers(String collection, DocCollection collectionState) {
if (this.closed) { if (this.closed) {
return; return;
} }
try { try {
notifications.submit(new Notification(liveNodes, collection, collectionState)); notifications.submit(new Notification(collection, collectionState));
} }
catch (RejectedExecutionException e) { catch (RejectedExecutionException e) {
if (closed == false) { if (closed == false) {
@ -1861,29 +1977,27 @@ public class ZkStateReader implements SolrCloseable {
private class Notification implements Runnable { private class Notification implements Runnable {
final Set<String> liveNodes;
final String collection; final String collection;
final DocCollection collectionState; final DocCollection collectionState;
private Notification(Set<String> liveNodes, String collection, DocCollection collectionState) { private Notification(String collection, DocCollection collectionState) {
this.liveNodes = liveNodes;
this.collection = collection; this.collection = collection;
this.collectionState = collectionState; this.collectionState = collectionState;
} }
@Override @Override
public void run() { public void run() {
List<CollectionStateWatcher> watchers = new ArrayList<>(); List<DocCollectionWatcher> watchers = new ArrayList<>();
collectionWatches.compute(collection, (k, v) -> { collectionWatches.compute(collection, (k, v) -> {
if (v == null) if (v == null)
return null; return null;
watchers.addAll(v.stateWatchers); watchers.addAll(v.stateWatchers);
return v; return v;
}); });
for (CollectionStateWatcher watcher : watchers) { for (DocCollectionWatcher watcher : watchers) {
try { try {
if (watcher.onStateChanged(liveNodes, collectionState)) { if (watcher.onStateChanged(collectionState)) {
removeCollectionStateWatcher(collection, watcher); removeDocCollectionWatcher(collection, watcher);
} }
} catch (Exception exception) { } catch (Exception exception) {
log.warn("Error on calling watcher", exception); log.warn("Error on calling watcher", exception);
@ -2118,4 +2232,54 @@ public class ZkStateReader implements SolrCloseable {
} }
} }
/**
* Helper class that acts as both a {@link DocCollectionWatcher} and a {@link LiveNodesListener}
* while wraping and delegating to a {@link CollectionStateWatcher}
*/
private final class DocCollectionAndLiveNodesWatcherWrapper implements DocCollectionWatcher, LiveNodesListener {
private final String collectionName;
private final CollectionStateWatcher delegate;
public int hashCode() {
return collectionName.hashCode() * delegate.hashCode();
}
public boolean equals(Object other) {
if (other instanceof DocCollectionAndLiveNodesWatcherWrapper) {
DocCollectionAndLiveNodesWatcherWrapper that
= (DocCollectionAndLiveNodesWatcherWrapper) other;
return this.collectionName.equals(that.collectionName)
&& this.delegate.equals(that.delegate);
}
return false;
}
public DocCollectionAndLiveNodesWatcherWrapper(final String collectionName,
final CollectionStateWatcher delegate) {
this.collectionName = collectionName;
this.delegate = delegate;
}
@Override
public boolean onStateChanged(DocCollection collectionState) {
final boolean result = delegate.onStateChanged(ZkStateReader.this.liveNodes,
collectionState);
if (result) {
// it might be a while before live nodes changes, so proactively remove ourselves
removeLiveNodesListener(this);
}
return result;
}
@Override
public boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
final DocCollection collection = ZkStateReader.this.clusterState.getCollectionOrNull(collectionName);
final boolean result = delegate.onStateChanged(newLiveNodes, collection);
if (result) {
// it might be a while before collection changes, so proactively remove ourselves
removeDocCollectionWatcher(collectionName, this);
}
return result;
}
}
} }

View File

@ -37,20 +37,22 @@ import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** @see TestDocCollectionWatcher */
public class TestCollectionStateWatchers extends SolrCloudTestCase { public class TestCollectionStateWatchers extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int CLUSTER_SIZE = 4; private static final int CLUSTER_SIZE = 4;
private static final int MAX_WAIT_TIMEOUT = 30;
private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!!
private ExecutorService executor = null; private ExecutorService executor = null;
@Before @Before
public void prepareCluster() throws Exception { public void prepareCluster() throws Exception {
configureCluster(CLUSTER_SIZE) configureCluster(CLUSTER_SIZE)
.addConfig("config", getFile("solrj/solr/collection1/conf").toPath()) .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
.configure(); .configure();
executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers"); executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
} }
@ -75,6 +77,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate) private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
Future<Boolean> future = executor.submit(() -> { Future<Boolean> future = executor.submit(() -> {
try { try {
while (true) { while (true) {
@ -100,50 +103,71 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
} }
@Test @Test
//Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018 public void testCollectionWatchWithShutdownOfActiveNode() throws Exception {
public void testSimpleCollectionWatch() throws Exception { doTestCollectionWatchWithNodeShutdown(false);
}
@Test
public void testCollectionWatchWithShutdownOfUnusedNode() throws Exception {
doTestCollectionWatchWithNodeShutdown(true);
}
private void doTestCollectionWatchWithNodeShutdown(final boolean shutdownUnusedNode)
throws Exception {
CloudSolrClient client = cluster.getSolrClient(); CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("testcollection", "config", 4, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT); // note: one node in our cluster is unsed by collection
CollectionAdminRequest.createCollection("testcollection", "config", CLUSTER_SIZE, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1)); (n, c) -> DocCollection.isFullyActive(n, c, CLUSTER_SIZE, 1));
final JettySolrRunner extraJetty = cluster.startJettySolrRunner();
final JettySolrRunner jettyToShutdown
= shutdownUnusedNode ? extraJetty : cluster.getJettySolrRunners().get(0);
final int expectedNodesWithActiveReplicas = CLUSTER_SIZE - (shutdownUnusedNode ? 0 : 1);
cluster.waitForAllNodes(MAX_WAIT_TIMEOUT);
// shutdown a node and check that we get notified about the change // shutdown a node and check that we get notified about the change
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> { client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> {
int nodeCount = 0; int nodesWithActiveReplicas = 0;
log.info("State changed: {}", collectionState); log.info("State changed: {}", collectionState);
for (Slice slice : collectionState) { for (Slice slice : collectionState) {
for (Replica replica : slice) { for (Replica replica : slice) {
if (replica.isActive(liveNodes)) if (replica.isActive(liveNodes))
nodeCount++; nodesWithActiveReplicas++;
} }
} }
if (nodeCount == 3) { if (liveNodes.size() == CLUSTER_SIZE
&& expectedNodesWithActiveReplicas == nodesWithActiveReplicas) {
latch.countDown(); latch.countDown();
return true; return true;
} }
return false; return false;
}); });
JettySolrRunner j = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); cluster.stopJettySolrRunner(jettyToShutdown);
cluster.waitForJettyToStop(j); cluster.waitForJettyToStop(jettyToShutdown);
assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
waitFor("CollectionStateWatcher wasn't cleared after completion", 1, TimeUnit.SECONDS, assertTrue("CollectionStateWatcher was never notified of cluster change",
() -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty()); latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
waitFor("CollectionStateWatcher wasn't cleared after completion",
MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty());
} }
@Test @Test
// commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testStateWatcherChecksCurrentStateOnRegister() throws Exception { public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
CloudSolrClient client = cluster.getSolrClient(); CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("currentstate", "config", 1, 1) CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT); .processAndWait(client, MAX_WAIT_TIMEOUT);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("currentstate", (n, c) -> { client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@ -151,9 +175,10 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
return false; return false;
}); });
assertTrue("CollectionStateWatcher isn't called on new registration", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); assertTrue("CollectionStateWatcher isn't called on new registration",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertEquals("CollectionStateWatcher should be retained", assertEquals("CollectionStateWatcher should be retained",
1, client.getZkStateReader().getStateWatchers("currentstate").size()); 1, client.getZkStateReader().getStateWatchers("currentstate").size());
final CountDownLatch latch2 = new CountDownLatch(1); final CountDownLatch latch2 = new CountDownLatch(1);
client.registerCollectionStateWatcher("currentstate", (n, c) -> { client.registerCollectionStateWatcher("currentstate", (n, c) -> {
@ -162,9 +187,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
}); });
assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection", assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
waitFor("CollectionStateWatcher should be removed", 1, TimeUnit.SECONDS, waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1); () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
} }
@Test @Test
@ -172,17 +197,17 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
CloudSolrClient client = cluster.getSolrClient(); CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1) CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT); .processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
// several goes, to check that we're not getting delayed state changes // several goes, to check that we're not getting delayed state changes
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
try { try {
client.waitForState("waitforstate", 1, TimeUnit.SECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); client.waitForState("waitforstate", 1, TimeUnit.SECONDS,
} (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
catch (TimeoutException e) { } catch (TimeoutException e) {
fail("waitForState should return immediately if the predicate is already satisfied"); fail("waitForState should return immediately if the predicate is already satisfied");
} }
} }
@ -190,114 +215,145 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
} }
@Test @Test
// commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
public void testCanWaitForNonexistantCollection() throws Exception { public void testCanWaitForNonexistantCollection() throws Exception {
Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
CollectionAdminRequest.createCollection("delayed", "config", 1, 1) CollectionAdminRequest.createCollection("delayed", "config", 1, 1)
.processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT); .processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
assertTrue("waitForState was not triggered by collection creation", future.get()); assertTrue("waitForState was not triggered by collection creation", future.get());
} }
@Test @Test
// commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testPredicateFailureTimesOut() throws Exception { public void testPredicateFailureTimesOut() throws Exception {
CloudSolrClient client = cluster.getSolrClient(); CloudSolrClient client = cluster.getSolrClient();
expectThrows(TimeoutException.class, () -> { expectThrows(TimeoutException.class, () -> {
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false)); client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS,
((liveNodes, collectionState) -> false));
}); });
waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS, waitFor("Watchers for collection should be removed after timeout",
() -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty()); MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
} }
@Test @Test
//Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception { public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
CloudSolrClient client = cluster.getSolrClient(); CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("falsepredicate", "config", 4, 1) CollectionAdminRequest.createCollection("falsepredicate", "config", 4, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT); .processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1)); (n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
final CountDownLatch firstCall = new CountDownLatch(1); final CountDownLatch firstCall = new CountDownLatch(1);
// stop a node, then add a watch waiting for all nodes to be back up // stop a node, then add a watch waiting for all nodes to be back up
JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt
(cluster.getJettySolrRunners().size()));
cluster.waitForJettyToStop(node1); cluster.waitForJettyToStop(node1);
Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (liveNodes, collectionState) -> { Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(liveNodes, collectionState) -> {
firstCall.countDown(); firstCall.countDown();
return DocCollection.isFullyActive(liveNodes, collectionState, 4, 1); return DocCollection.isFullyActive(liveNodes, collectionState, 4, 1);
}); });
// first, stop another node; the watch should not be fired after this! // first, stop another node; the watch should not be fired after this!
JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt
(cluster.getJettySolrRunners().size()));
// now start them both back up // now start them both back up
cluster.startJettySolrRunner(node1); cluster.startJettySolrRunner(node1);
assertTrue("CollectionStateWatcher not called after 30 seconds", firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); assertTrue("CollectionStateWatcher not called",
firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
cluster.startJettySolrRunner(node2); cluster.startJettySolrRunner(node2);
Boolean result = future.get(); Boolean result = future.get();
assertTrue("Did not see a fully active cluster after 30 seconds", result); assertTrue("Did not see a fully active cluster", result);
} }
@Test @Test
// commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWatcherIsRemovedAfterTimeout() throws Exception { public void testWatcherIsRemovedAfterTimeout() throws Exception {
CloudSolrClient client = cluster.getSolrClient(); CloudSolrClient client = cluster.getSolrClient();
assertTrue("There should be no watchers for a non-existent collection!", assertTrue("There should be no watchers for a non-existent collection!",
client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
expectThrows(TimeoutException.class, () -> { expectThrows(TimeoutException.class, () -> {
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
}); });
waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS, waitFor("Watchers for collection should be removed after timeout",
() -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
} }
@Test @Test
// commented 20-July-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
public void testDeletionsTriggerWatches() throws Exception { public void testDeletionsTriggerWatches() throws Exception {
CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1) CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1)
.process(cluster.getSolrClient()); .process(cluster.getSolrClient());
Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (l, c) -> c == null);
Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(l, c) -> c == null);
CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient()); CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient());
assertTrue("CollectionStateWatcher not notified of delete call after 30 seconds", future.get()); assertTrue("CollectionStateWatcher not notified of delete call", future.get());
}
@Test
public void testLiveNodeChangesTriggerWatches() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("test_collection", "config", 1, 1).process(client);
Future<Boolean> future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(l, c) -> (l.size() == 1 + CLUSTER_SIZE));
JettySolrRunner unusedJetty = cluster.startJettySolrRunner();
assertTrue("CollectionStateWatcher not notified of new node", future.get());
waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(l, c) -> (l.size() == CLUSTER_SIZE));
cluster.stopJettySolrRunner(unusedJetty);
assertTrue("CollectionStateWatcher not notified of node lost", future.get());
waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
} }
@Test @Test
//Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWatchesWorkForStateFormat1() throws Exception { public void testWatchesWorkForStateFormat1() throws Exception {
final CloudSolrClient client = cluster.getSolrClient(); final CloudSolrClient client = cluster.getSolrClient();
Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1) CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
.processAndWait(client, MAX_WAIT_TIMEOUT); .processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", future.get()); assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation",
future.get());
Future<Boolean> migrated Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
= waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null && c.getStateFormat() == 2);
(n, c) -> c != null && c.getStateFormat() == 2);
CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT); CollectionAdminRequest.migrateCollectionFormat("stateformat1")
.processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get()); assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get());
} }

View File

@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common.cloud;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.ExecutorUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** @see TestCollectionStateWatchers */
public class TestDocCollectionWatcher extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int CLUSTER_SIZE = 4;
private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!!
private ExecutorService executor = null;
@Before
public void prepareCluster() throws Exception {
configureCluster(CLUSTER_SIZE)
.addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
.configure();
executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
}
@After
public void tearDownCluster() throws Exception {
executor.shutdown();
shutdownCluster();
executor = null;
}
private Future<Boolean> waitInBackground(String collection, long timeout, TimeUnit unit,
Predicate<DocCollection> predicate) {
return executor.submit(() -> {
try {
cluster.getSolrClient().waitForState(collection, timeout, unit, predicate);
} catch (InterruptedException | TimeoutException e) {
return Boolean.FALSE;
}
return Boolean.TRUE;
});
}
private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
throws InterruptedException, ExecutionException {
Future<Boolean> future = executor.submit(() -> {
try {
while (true) {
if (predicate.call())
return true;
TimeUnit.MILLISECONDS.sleep(10);
}
}
catch (InterruptedException e) {
return false;
}
});
try {
if (future.get(timeout, unit) == true) {
return;
}
}
catch (TimeoutException e) {
// pass failure message on
}
future.cancel(true);
fail(message);
}
@Test
public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
final CountDownLatch latch = new CountDownLatch(1);
client.registerDocCollectionWatcher("currentstate", (c) -> {
latch.countDown();
return false;
});
assertTrue("DocCollectionWatcher isn't called on new registration",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertEquals("DocCollectionWatcher should be retained",
1, client.getZkStateReader().getStateWatchers("currentstate").size());
final CountDownLatch latch2 = new CountDownLatch(1);
client.registerDocCollectionWatcher("currentstate", (c) -> {
latch2.countDown();
return true;
});
assertTrue("DocCollectionWatcher isn't called when registering for already-watched collection",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
}
@Test
public void testWaitForStateChecksCurrentState() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
// several goes, to check that we're not getting delayed state changes
for (int i = 0; i < 10; i++) {
try {
client.waitForState("waitforstate", 1, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
} catch (TimeoutException e) {
fail("waitForState should return immediately if the predicate is already satisfied");
}
}
}
@Test
public void testCanWaitForNonexistantCollection() throws Exception {
Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(c) -> (null != c));
CollectionAdminRequest.createCollection("delayed", "config", 1, 1)
.processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
assertTrue("waitForState was not triggered by collection creation", future.get());
}
@Test
public void testPredicateFailureTimesOut() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
expectThrows(TimeoutException.class, () -> {
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS,
((liveNodes, collectionState) -> false));
});
waitFor("Watchers for collection should be removed after timeout",
MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
}
@Test
public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("falsepredicate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
// create collection with 1 shard 1 replica...
client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
// set watcher waiting for at least 3 replicas (will fail initially)
final AtomicInteger runCount = new AtomicInteger(0);
final Future<Boolean> future = waitInBackground
("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(collectionState) -> {
runCount.incrementAndGet();
int replicas = 0;
for (Slice slice : collectionState) {
for (Replica replica : slice) {
replicas++;
}
}
return 3 <= replicas;
});
// add a 2nd replica...
CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 2));
// confirm watcher has run at least once and has been retained...
final int runCountSnapshot = runCount.get();
assertTrue(0 < runCountSnapshot);
assertEquals(1, client.getZkStateReader().getStateWatchers("falsepredicate").size());
// now add a 3rd replica...
CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
.processAndWait(client, MAX_WAIT_TIMEOUT);
// now confirm watcher is invoked & removed
assertTrue("watcher never succeeded", future.get());
assertTrue(runCountSnapshot < runCount.get());
waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("falsepredicate").size() == 0);
}
@Test
public void testWatcherIsRemovedAfterTimeout() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
assertTrue("There should be no watchers for a non-existent collection!",
client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
expectThrows(TimeoutException.class, () -> {
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS,
(c) -> (false));
});
waitFor("Watchers for collection should be removed after timeout",
MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
}
@Test
public void testDeletionsTriggerWatches() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1).process(client);
client.waitForState("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(c) -> c == null);
CollectionAdminRequest.deleteCollection("tobedeleted").process(client);
assertTrue("DocCollectionWatcher not notified of delete call", future.get());
}
@Test
public void testWatchesWorkForStateFormat1() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(c) -> (null != c) );
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
assertTrue("DocCollectionWatcher not notified of stateformat=1 collection creation",
future.get());
Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(c) -> c != null && c.getStateFormat() == 2);
CollectionAdminRequest.migrateCollectionFormat("stateformat1")
.processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("DocCollectionWatcher did not persist over state format migration", migrated.get());
}
}

View File

@ -205,7 +205,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
throws Exception { throws Exception {
log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds); log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (liveNodes, docCollection) -> { zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> {
if (docCollection == null) if (docCollection == null)
return true; return true;
return false; return false;
@ -237,7 +237,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
Thread.sleep(100); Thread.sleep(100);
} }
zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (liveNodes, docCollection) -> { zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection) -> {
if (docCollection == null) if (docCollection == null)
return false; return false;
@ -253,7 +253,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName, public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName,
Replica.State expectedState) throws InterruptedException, TimeoutException { Replica.State expectedState) throws InterruptedException, TimeoutException {
reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS, reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS,
(liveNodes, collectionState) -> collectionState != null && collectionState.getSlice(shard) != null (collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
&& collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null
&& collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState); && collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState);
} }

View File

@ -405,7 +405,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
.setCreateNodeSet("") .setCreateNodeSet("")
.process(cloudClient).getStatus()); .process(cloudClient).getStatus());
cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (l,c) -> c != null && c.getSlices().size() == sliceCount); cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (c) -> c != null && c.getSlices().size() == sliceCount);
ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool")); ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool"));
@ -585,7 +585,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
protected void waitForActiveReplicaCount(CloudSolrClient client, String collection, int expectedNumReplicas) throws TimeoutException, NotInClusterStateException { protected void waitForActiveReplicaCount(CloudSolrClient client, String collection, int expectedNumReplicas) throws TimeoutException, NotInClusterStateException {
AtomicInteger nReplicas = new AtomicInteger(); AtomicInteger nReplicas = new AtomicInteger();
try { try {
client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (n, c) -> { client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (c) -> {
if (c == null) if (c == null)
return false; return false;
int numReplicas = getTotalReplicas(c, c.getName()); int numReplicas = getTotalReplicas(c, c.getName());

View File

@ -553,7 +553,7 @@ public class MiniSolrCloudCluster {
} }
for (String collection : reader.getClusterState().getCollectionStates().keySet()) { for (String collection : reader.getClusterState().getCollectionStates().keySet()) {
reader.waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null ? true : false); reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState) -> collectionState == null ? true : false);
} }
} }