SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and CloudSolrClient to be triggered on liveNode changes.

Also add Predicate<DocCollection> equivilents for callers that don't care about liveNodes.
This commit is contained in:
Chris Hostetter 2019-06-17 09:59:43 -07:00
parent dff7611096
commit 5a974860fa
19 changed files with 889 additions and 112 deletions

View File

@ -159,6 +159,10 @@ Bug Fixes
* SOLR-13333: unleashing terms.ttf from terms.list when distrib=false (Munendra S N via Mikhail Khludnev)
* SOLR-13490: Fix CollectionStateWatcher/CollectionStatePredicate based APIs in ZkStateReader and
CloudSolrClient to be triggered on liveNode changes. Also add Predicate<DocCollection> equivilents
for callers that don't care about liveNodes. (hossman)
Other Changes
----------------------

View File

@ -68,12 +68,12 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica;
@ -1052,7 +1052,7 @@ public class ZkController implements Closeable {
CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size());
for (String collectionWithLocalReplica : collectionsWithLocalReplica) {
zkStateReader.registerCollectionStateWatcher(collectionWithLocalReplica, (liveNodes, collectionState) -> {
zkStateReader.registerDocCollectionWatcher(collectionWithLocalReplica, (collectionState) -> {
if (collectionState == null) return false;
boolean foundStates = true;
for (CoreDescriptor coreDescriptor : cc.getCoreDescriptors()) {
@ -1194,7 +1194,7 @@ public class ZkController implements Closeable {
// check replica's existence in clusterstate first
try {
zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
} catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
}
@ -1301,7 +1301,7 @@ public class ZkController implements Closeable {
// make sure we have an update cluster state right away
zkStateReader.forceUpdateCollection(collection);
// the watcher is added to a set so multiple calls of this method will left only one watcher
zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(),
zkStateReader.registerDocCollectionWatcher(cloudDesc.getCollectionName(),
new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
return shardId;
} finally {
@ -1845,7 +1845,7 @@ public class ZkController implements Closeable {
AtomicReference<String> errorMessage = new AtomicReference<>();
AtomicReference<DocCollection> collectionState = new AtomicReference<>();
try {
zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (n, c) -> {
zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
collectionState.set(c);
if (c == null)
return false;
@ -2545,7 +2545,7 @@ public class ZkController implements Closeable {
};
}
private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher {
private class UnloadCoreOnDeletedWatcher implements DocCollectionWatcher {
String coreNodeName;
String shard;
String coreName;
@ -2558,7 +2558,7 @@ public class ZkController implements Closeable {
@Override
// synchronized due to SOLR-11535
public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
public synchronized boolean onStateChanged(DocCollection collectionState) {
if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;

View File

@ -321,7 +321,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
CollectionAdminParams.COLOCATED_WITH, collectionName);
ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
try {
zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
zkStateReader.waitForState(withCollection, 5, TimeUnit.SECONDS, (collectionState) -> collectionName.equals(collectionState.getStr(COLOCATED_WITH)));
} catch (TimeoutException e) {
log.warn("Timed out waiting to see the " + COLOCATED_WITH + " property set on collection: " + withCollection);
// maybe the overseer queue is backed up, we don't want to fail the create request

View File

@ -134,7 +134,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
// wait for a while until we don't see the collection
zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null);
zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null);
// we can delete any remaining unique aliases
if (!aliasReferences.isEmpty()) {

View File

@ -134,7 +134,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader zkStateReader = ocmh.zkStateReader;
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (l, c) -> c.getSlice(sliceId) == null);
zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
} catch (SolrException e) {

View File

@ -416,7 +416,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
try {
zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (n, c) -> {
zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> {
if (c == null)
return true;
Slice slice = c.getSlice(shard);

View File

@ -35,8 +35,8 @@ import org.apache.solr.client.solrj.request.CoreStatus;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@ -115,7 +115,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
Set<DocCollectionWatcher> watchers = accessor.getStateWatchers(collectionName);
CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
.process(cluster.getSolrClient());
waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> {
@ -221,7 +221,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
Replica replica = getRandomReplica(shard);
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
Set<DocCollectionWatcher> watchers = accessor.getStateWatchers(collectionName);
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.util.DefaultSolrThreadFactory;
import static org.apache.solr.cloud.SolrCloudTestCase.clusterShape;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestWaitForStateWithJettyShutdowns extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public void testWaitForStateAfterShutDown() throws Exception {
final String col_name = "test_col";
final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster
(1, createTempDir(), JettyConfig.builder().build());
try {
log.info("Create our collection");
CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
log.info("Sanity check that our collection has come online");
cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS, clusterShape(1, 1));
log.info("Shutdown 1 node");
final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
nodeToStop.stop();
log.info("Wait to confirm our node is fully shutdown");
cluster.waitForJettyToStop(nodeToStop);
// now that we're confident that node has stoped, check if a waitForState
// call will detect the missing replica -- shouldn't need long wait times (we know it's down)...
log.info("Now check if waitForState will recognize we already have the exepcted state");
cluster.getSolrClient().waitForState(col_name, 500, TimeUnit.MILLISECONDS, clusterShape(1, 0));
} finally {
cluster.shutdown();
}
}
public void testWaitForStateBeforeShutDown() throws Exception {
final String col_name = "test_col";
final ExecutorService executor = ExecutorUtil.newMDCAwareFixedThreadPool
(1, new DefaultSolrThreadFactory("background_executor"));
final MiniSolrCloudCluster cluster = new MiniSolrCloudCluster
(1, createTempDir(), JettyConfig.builder().build());
try {
log.info("Create our collection");
CollectionAdminRequest.createCollection(col_name, "_default", 1, 1).process(cluster.getSolrClient());
log.info("Sanity check that our collection has come online");
cluster.getSolrClient().waitForState(col_name, 30, TimeUnit.SECONDS,
SolrCloudTestCase.clusterShape(1, 1));
// HACK implementation detail...
//
// we know that in the current implementation, waitForState invokes the predicate twice
// independently of the current state of the collection and/or wether the predicate succeeds.
// If this implementation detail changes, (ie: so that it's only invoked once)
// then this number needs to change -- but the test fundementally depends on the implementation
// calling the predicate at least once, which should also be neccessary for any future impl
// (to verify that it didn't "miss" the state change when creating the watcher)
final CountDownLatch latch = new CountDownLatch(2);
final Future<?> backgroundWaitForState = executor.submit
(() -> {
try {
cluster.getSolrClient().waitForState(col_name, 180, TimeUnit.SECONDS,
new LatchCountingPredicateWrapper(latch,
clusterShape(1, 0)));
} catch (Exception e) {
log.error("background thread got exception", e);
throw new RuntimeException(e);
}
return;
}, null);
log.info("Awaiting latch...");
if (! latch.await(120, TimeUnit.SECONDS)) {
fail("timed out Waiting a ridiculous amount of time for the waitForState latch -- did impl change?");
}
log.info("Shutdown 1 node");
final JettySolrRunner nodeToStop = cluster.getJettySolrRunner(0);
nodeToStop.stop();
log.info("Wait to confirm our node is fully shutdown");
cluster.waitForJettyToStop(nodeToStop);
// now that we're confident that node has stoped, check if a waitForState
// call will detect the missing replica -- shouldn't need long wait times...
log.info("Checking Future result to see if waitForState finished successfully");
try {
backgroundWaitForState.get();
} catch (ExecutionException e) {
log.error("background waitForState exception", e);
throw e;
}
} finally {
ExecutorUtil.shutdownAndAwaitTermination(executor);
cluster.shutdown();
}
}
public final class LatchCountingPredicateWrapper implements CollectionStatePredicate {
private final CountDownLatch latch;
private final CollectionStatePredicate inner;
public LatchCountingPredicateWrapper(final CountDownLatch latch, final CollectionStatePredicate inner) {
this.latch = latch;
this.inner = inner;
}
public boolean matches(Set<String> liveNodes, DocCollection collectionState) {
final boolean result = inner.matches(liveNodes, collectionState);
log.info("Predicate called: result={}, (pre)latch={}, liveNodes={}, state={}",
result, latch.getCount(), liveNodes, collectionState);
latch.countDown();
return result;
}
}
}

View File

@ -28,7 +28,7 @@ public class ZkStateReaderAccessor {
this.zkStateReader = zkStateReader;
}
public Set<CollectionStateWatcher> getStateWatchers(String collection) {
public Set<DocCollectionWatcher> getStateWatchers(String collection) {
return zkStateReader.getStateWatchers(collection);
}

View File

@ -42,6 +42,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.ResponseParser;
@ -62,6 +63,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocCollectionWatcher;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
@ -362,11 +364,21 @@ public abstract class BaseCloudSolrClient extends SolrClient {
}
/**
* Block until a collection state matches a predicate, or a timeout
* Block until a CollectionStatePredicate returns true, or the wait times out
*
* <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* </p>
*
* <p>
* This implementation utilizes {@link CollectionStateWatcher} internally.
* Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate}
* instead
* </p>
*
* @see #waitForState(String, long, TimeUnit, Predicate)
* @see #registerCollectionStateWatcher
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
@ -379,14 +391,45 @@ public abstract class BaseCloudSolrClient extends SolrClient {
getClusterStateProvider().connect();
assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
}
/**
* Block until a Predicate returns true, or the wait times out
*
* <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* </p>
*
* @see #registerDocCollectionWatcher
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate a {@link Predicate} to test against the {@link DocCollection}
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
*/
public void waitForState(String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
throws InterruptedException, TimeoutException {
getClusterStateProvider().connect();
assertZKStateProvider().zkStateReader.waitForState(collection, wait, unit, predicate);
}
/**
* Register a CollectionStateWatcher to be called when the cluster state for a collection changes
* <em>or</em> the set of live nodes changes.
*
* Note that the watcher is unregistered after it has been called once. To make a watcher persistent,
* it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)}
* call
* <p>
* The Watcher will automatically be removed when it's
* <code>onStateChanged</code> returns <code>true</code>
* </p>
*
* <p>
* This implementation utilizes {@link ZkStateReader#registerCollectionStateWatcher} internally.
* Callers that don't care about liveNodes are encouraged to use a {@link DocCollectionWatcher}
* instead
* </p>
*
* @see #registerDocCollectionWatcher(String, DocCollectionWatcher)
* @see ZkStateReader#registerCollectionStateWatcher
* @param collection the collection to watch
* @param watcher a watcher that will be called when the state changes
*/
@ -395,6 +438,23 @@ public abstract class BaseCloudSolrClient extends SolrClient {
assertZKStateProvider().zkStateReader.registerCollectionStateWatcher(collection, watcher);
}
/**
* Register a DocCollectionWatcher to be called when the cluster state for a collection changes.
*
* <p>
* The Watcher will automatically be removed when it's
* <code>onStateChanged</code> returns <code>true</code>
* </p>
*
* @see ZkStateReader#registerDocCollectionWatcher
* @param collection the collection to watch
* @param watcher a watcher that will be called when the state changes
*/
public void registerDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
getClusterStateProvider().connect();
assertZKStateProvider().zkStateReader.registerDocCollectionWatcher(collection, watcher);
}
private NamedList<Object> directUpdate(AbstractUpdateRequest request, String collection) throws SolrServerException {
UpdateRequest updateRequest = (UpdateRequest) request;
SolrParams params = request.getParams();

View File

@ -19,23 +19,27 @@ package org.apache.solr.common.cloud;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
/**
* Interface to determine if a collection state matches a required state
* Interface to determine if a set of liveNodes and a collection's state matches some expecatations.
*
* @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate)
* @see ZkStateReader#waitForState(String, long, TimeUnit, Predicate)
*/
public interface CollectionStatePredicate {
/**
* Check the collection state matches a required state
*
* Check if the set of liveNodes <em>and</em> the collection state matches a required state
* <p>
* Note that both liveNodes and collectionState should be consulted to determine
* the overall state.
* </p>
*
* @param liveNodes the current set of live nodes
* @param collectionState the latest collection state, or null if the collection
* does not exist
* @return true if the input matches the requirements of this predicate
*/
boolean matches(Set<String> liveNodes, DocCollection collectionState);

View File

@ -21,21 +21,24 @@ import java.util.Set;
/**
* Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)}
* and called whenever the collection state changes.
* and called whenever there is a change in the collection state <em>or</em> in the list of liveNodes.
*
* @see DocCollectionWatcher
*/
public interface CollectionStateWatcher {
/**
* Called when the collection we are registered against has a change of state.
* Called when either the collection we are registered against has a change of state <em>or</em> there is a change to the live nodes of our collection.
*
* <p>
* Note that, due to the way Zookeeper watchers are implemented, a single call may be
* the result of several state changes. Also, multiple calls to this method can be made
* with the same state, ie. without any new updates.
* </p>
*
* @param liveNodes the set of live nodes
* @param collectionState the new collection state (may be null if the collection has been
* deleted)
*
* @return true if the watcher should be removed
*/
boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState);

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common.cloud;
/**
* Callback registered with {@link ZkStateReader#registerDocCollectionWatcher(String, DocCollectionWatcher)}
* and called whenever the DocCollection changes.
*/
public interface DocCollectionWatcher {
/**
* Called when the collection we are registered against has a change of state.
*
* <p>
* Note that, due to the way Zookeeper watchers are implemented, a single call may be
* the result of several state changes. Also, multiple calls to this method can be made
* with the same state, ie. without any new updates.
* </p>
*
* @param collection the new collection state (may be null if the collection has been deleted)
* @return true if the watcher should be removed
*/
boolean onStateChanged(DocCollection collection);
}

View File

@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@ -185,7 +186,7 @@ public class ZkStateReader implements SolrCloseable {
private final Runnable securityNodeListener;
private ConcurrentHashMap<String, CollectionWatch<CollectionStateWatcher>> collectionWatches = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, CollectionWatch<DocCollectionWatcher>> collectionWatches = new ConcurrentHashMap<>();
// named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
@ -590,7 +591,7 @@ public class ZkStateReader implements SolrCloseable {
notifyCloudCollectionsListeners();
for (String collection : changedCollections) {
notifyStateWatchers(liveNodes, collection, clusterState.getCollectionOrNull(collection));
notifyStateWatchers(collection, clusterState.getCollectionOrNull(collection));
}
}
@ -1599,8 +1600,45 @@ public class ZkStateReader implements SolrCloseable {
/**
* Register a CollectionStateWatcher to be called when the state of a collection changes
* <em>or</em> the set of live nodes changes.
*
* <p>
* The Watcher will automatically be removed when it's
* <code>onStateChanged</code> returns <code>true</code>
* </p>
*
* <p>
* This is method is just syntactic sugar for registering both a {@link DocCollectionWatcher} and
* a {@link LiveNodesListener}. Callers that only care about one or the other (but not both) are
* encouraged to use the more specific methods register methods as it may reduce the number of
* ZooKeeper watchers needed, and reduce the amount of network/cpu used.
* </p>
*
* @see #registerDocCollectionWatcher
* @see #registerLiveNodesListener
*/
public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) {
final DocCollectionAndLiveNodesWatcherWrapper wrapper
= new DocCollectionAndLiveNodesWatcherWrapper(collection, stateWatcher);
registerDocCollectionWatcher(collection, wrapper);
registerLiveNodesListener(wrapper);
DocCollection state = clusterState.getCollectionOrNull(collection);
if (stateWatcher.onStateChanged(liveNodes, state) == true) {
removeCollectionStateWatcher(collection, stateWatcher);
}
}
/**
* Register a DocCollectionWatcher to be called when the state of a collection changes
*
* <p>
* The Watcher will automatically be removed when it's
* <code>onStateChanged</code> returns <code>true</code>
* </p>
*/
public void registerDocCollectionWatcher(String collection, DocCollectionWatcher stateWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> {
if (v == null) {
@ -1616,17 +1654,27 @@ public class ZkStateReader implements SolrCloseable {
}
DocCollection state = clusterState.getCollectionOrNull(collection);
if (stateWatcher.onStateChanged(liveNodes, state) == true) {
removeCollectionStateWatcher(collection, stateWatcher);
if (stateWatcher.onStateChanged(state) == true) {
removeDocCollectionWatcher(collection, stateWatcher);
}
}
/**
* Block until a CollectionStatePredicate returns true, or the wait times out
*
* <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* </p>
*
* <p>
* This implementation utilizes {@link CollectionStateWatcher} internally.
* Callers that don't care about liveNodes are encouraged to use a {@link DocCollection} {@link Predicate}
* instead
* </p>
*
* @see #waitForState(String, long, TimeUnit, Predicate)
* @see #registerCollectionStateWatcher
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
@ -1667,11 +1715,58 @@ public class ZkStateReader implements SolrCloseable {
}
/**
* Block until a LiveNodesStatePredicate returns true, or the wait times out
* Block until a Predicate returns true, or the wait times out
*
* <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* </p>
*
* @param collection the collection to watch
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
* @throws InterruptedException on interrupt
* @throws TimeoutException on timeout
*/
public void waitForState(final String collection, long wait, TimeUnit unit, Predicate<DocCollection> predicate)
throws InterruptedException, TimeoutException {
if (closed) {
throw new AlreadyClosedException();
}
final CountDownLatch latch = new CountDownLatch(1);
waitLatches.add(latch);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
DocCollectionWatcher watcher = (c) -> {
docCollection.set(c);
boolean matches = predicate.test(c);
if (matches)
latch.countDown();
return matches;
};
registerDocCollectionWatcher(collection, watcher);
try {
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit))
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
}
finally {
removeDocCollectionWatcher(collection, watcher);
waitLatches.remove(latch);
}
}
/**
* Block until a LiveNodesStatePredicate returns true, or the wait times out
* <p>
* Note that the predicate may be called again even after it has returned true, so
* implementors should avoid changing state within the predicate call itself.
* </p>
* @param wait how long to wait
* @param unit the units of the wait parameter
* @param predicate the predicate to call on state changes
@ -1713,14 +1808,35 @@ public class ZkStateReader implements SolrCloseable {
/**
* Remove a watcher from a collection's watch list.
*
* <p>
* This allows Zookeeper watches to be removed if there is no interest in the
* collection.
* </p>
*
* @see #registerCollectionStateWatcher
* @param collection the collection
* @param watcher the watcher
*/
public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) {
final DocCollectionAndLiveNodesWatcherWrapper wrapper
= new DocCollectionAndLiveNodesWatcherWrapper(collection, watcher);
removeDocCollectionWatcher(collection, wrapper);
removeLiveNodesListener(wrapper);
}
/**
* Remove a watcher from a collection's watch list.
* <p>
* This allows Zookeeper watches to be removed if there is no interest in the
* collection.
* </p>
*
* @see #registerDocCollectionWatcher
* @param collection the collection
* @param watcher the watcher
*/
public void removeDocCollectionWatcher(String collection, DocCollectionWatcher watcher) {
AtomicBoolean reconstructState = new AtomicBoolean(false);
collectionWatches.compute(collection, (k, v) -> {
if (v == null)
@ -1742,8 +1858,8 @@ public class ZkStateReader implements SolrCloseable {
}
/* package-private for testing */
Set<CollectionStateWatcher> getStateWatchers(String collection) {
final Set<CollectionStateWatcher> watchers = new HashSet<>();
Set<DocCollectionWatcher> getStateWatchers(String collection) {
final Set<DocCollectionWatcher> watchers = new HashSet<>();
collectionWatches.compute(collection, (k, v) -> {
if (v != null) {
watchers.addAll(v.stateWatchers);
@ -1845,12 +1961,12 @@ public class ZkStateReader implements SolrCloseable {
}
}
private void notifyStateWatchers(Set<String> liveNodes, String collection, DocCollection collectionState) {
private void notifyStateWatchers(String collection, DocCollection collectionState) {
if (this.closed) {
return;
}
try {
notifications.submit(new Notification(liveNodes, collection, collectionState));
notifications.submit(new Notification(collection, collectionState));
}
catch (RejectedExecutionException e) {
if (closed == false) {
@ -1861,29 +1977,27 @@ public class ZkStateReader implements SolrCloseable {
private class Notification implements Runnable {
final Set<String> liveNodes;
final String collection;
final DocCollection collectionState;
private Notification(Set<String> liveNodes, String collection, DocCollection collectionState) {
this.liveNodes = liveNodes;
private Notification(String collection, DocCollection collectionState) {
this.collection = collection;
this.collectionState = collectionState;
}
@Override
public void run() {
List<CollectionStateWatcher> watchers = new ArrayList<>();
List<DocCollectionWatcher> watchers = new ArrayList<>();
collectionWatches.compute(collection, (k, v) -> {
if (v == null)
return null;
watchers.addAll(v.stateWatchers);
return v;
});
for (CollectionStateWatcher watcher : watchers) {
for (DocCollectionWatcher watcher : watchers) {
try {
if (watcher.onStateChanged(liveNodes, collectionState)) {
removeCollectionStateWatcher(collection, watcher);
if (watcher.onStateChanged(collectionState)) {
removeDocCollectionWatcher(collection, watcher);
}
} catch (Exception exception) {
log.warn("Error on calling watcher", exception);
@ -2118,4 +2232,54 @@ public class ZkStateReader implements SolrCloseable {
}
}
/**
* Helper class that acts as both a {@link DocCollectionWatcher} and a {@link LiveNodesListener}
* while wraping and delegating to a {@link CollectionStateWatcher}
*/
private final class DocCollectionAndLiveNodesWatcherWrapper implements DocCollectionWatcher, LiveNodesListener {
private final String collectionName;
private final CollectionStateWatcher delegate;
public int hashCode() {
return collectionName.hashCode() * delegate.hashCode();
}
public boolean equals(Object other) {
if (other instanceof DocCollectionAndLiveNodesWatcherWrapper) {
DocCollectionAndLiveNodesWatcherWrapper that
= (DocCollectionAndLiveNodesWatcherWrapper) other;
return this.collectionName.equals(that.collectionName)
&& this.delegate.equals(that.delegate);
}
return false;
}
public DocCollectionAndLiveNodesWatcherWrapper(final String collectionName,
final CollectionStateWatcher delegate) {
this.collectionName = collectionName;
this.delegate = delegate;
}
@Override
public boolean onStateChanged(DocCollection collectionState) {
final boolean result = delegate.onStateChanged(ZkStateReader.this.liveNodes,
collectionState);
if (result) {
// it might be a while before live nodes changes, so proactively remove ourselves
removeLiveNodesListener(this);
}
return result;
}
@Override
public boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
final DocCollection collection = ZkStateReader.this.clusterState.getCollectionOrNull(collectionName);
final boolean result = delegate.onStateChanged(newLiveNodes, collection);
if (result) {
// it might be a while before collection changes, so proactively remove ourselves
removeDocCollectionWatcher(collectionName, this);
}
return result;
}
}
}

View File

@ -37,12 +37,14 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** @see TestDocCollectionWatcher */
public class TestCollectionStateWatchers extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int CLUSTER_SIZE = 4;
private static final int MAX_WAIT_TIMEOUT = 30;
private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!!
private ExecutorService executor = null;
@ -75,6 +77,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
throws InterruptedException, ExecutionException {
Future<Boolean> future = executor.submit(() -> {
try {
while (true) {
@ -100,45 +103,66 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
}
@Test
//Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testSimpleCollectionWatch() throws Exception {
public void testCollectionWatchWithShutdownOfActiveNode() throws Exception {
doTestCollectionWatchWithNodeShutdown(false);
}
@Test
public void testCollectionWatchWithShutdownOfUnusedNode() throws Exception {
doTestCollectionWatchWithNodeShutdown(true);
}
private void doTestCollectionWatchWithNodeShutdown(final boolean shutdownUnusedNode)
throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("testcollection", "config", 4, 1)
// note: one node in our cluster is unsed by collection
CollectionAdminRequest.createCollection("testcollection", "config", CLUSTER_SIZE, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 4, 1));
(n, c) -> DocCollection.isFullyActive(n, c, CLUSTER_SIZE, 1));
final JettySolrRunner extraJetty = cluster.startJettySolrRunner();
final JettySolrRunner jettyToShutdown
= shutdownUnusedNode ? extraJetty : cluster.getJettySolrRunners().get(0);
final int expectedNodesWithActiveReplicas = CLUSTER_SIZE - (shutdownUnusedNode ? 0 : 1);
cluster.waitForAllNodes(MAX_WAIT_TIMEOUT);
// shutdown a node and check that we get notified about the change
final CountDownLatch latch = new CountDownLatch(1);
client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> {
int nodeCount = 0;
int nodesWithActiveReplicas = 0;
log.info("State changed: {}", collectionState);
for (Slice slice : collectionState) {
for (Replica replica : slice) {
if (replica.isActive(liveNodes))
nodeCount++;
nodesWithActiveReplicas++;
}
}
if (nodeCount == 3) {
if (liveNodes.size() == CLUSTER_SIZE
&& expectedNodesWithActiveReplicas == nodesWithActiveReplicas) {
latch.countDown();
return true;
}
return false;
});
JettySolrRunner j = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
cluster.waitForJettyToStop(j);
assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
cluster.stopJettySolrRunner(jettyToShutdown);
cluster.waitForJettyToStop(jettyToShutdown);
waitFor("CollectionStateWatcher wasn't cleared after completion", 1, TimeUnit.SECONDS,
assertTrue("CollectionStateWatcher was never notified of cluster change",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
waitFor("CollectionStateWatcher wasn't cleared after completion",
MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty());
}
@Test
// commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
@ -151,7 +175,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
return false;
});
assertTrue("CollectionStateWatcher isn't called on new registration", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertTrue("CollectionStateWatcher isn't called on new registration",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertEquals("CollectionStateWatcher should be retained",
1, client.getZkStateReader().getStateWatchers("currentstate").size());
@ -163,7 +188,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
waitFor("CollectionStateWatcher should be removed", 1, TimeUnit.SECONDS,
waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
}
@ -180,9 +205,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
// several goes, to check that we're not getting delayed state changes
for (int i = 0; i < 10; i++) {
try {
client.waitForState("waitforstate", 1, TimeUnit.SECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
}
catch (TimeoutException e) {
client.waitForState("waitforstate", 1, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
} catch (TimeoutException e) {
fail("waitForState should return immediately if the predicate is already satisfied");
}
}
@ -190,8 +215,6 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
}
@Test
// commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
public void testCanWaitForNonexistantCollection() throws Exception {
Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
@ -205,19 +228,19 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
}
@Test
// commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testPredicateFailureTimesOut() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
expectThrows(TimeoutException.class, () -> {
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false));
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS,
((liveNodes, collectionState) -> false));
});
waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
waitFor("Watchers for collection should be removed after timeout",
MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
}
@Test
//Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
@ -230,58 +253,90 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
final CountDownLatch firstCall = new CountDownLatch(1);
// stop a node, then add a watch waiting for all nodes to be back up
JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt
(cluster.getJettySolrRunners().size()));
cluster.waitForJettyToStop(node1);
Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
Future<Boolean> future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(liveNodes, collectionState) -> {
firstCall.countDown();
return DocCollection.isFullyActive(liveNodes, collectionState, 4, 1);
});
// first, stop another node; the watch should not be fired after this!
JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt
(cluster.getJettySolrRunners().size()));
// now start them both back up
cluster.startJettySolrRunner(node1);
assertTrue("CollectionStateWatcher not called after 30 seconds", firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertTrue("CollectionStateWatcher not called",
firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
cluster.startJettySolrRunner(node2);
Boolean result = future.get();
assertTrue("Did not see a fully active cluster after 30 seconds", result);
assertTrue("Did not see a fully active cluster", result);
}
@Test
// commented 20-July-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWatcherIsRemovedAfterTimeout() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
assertTrue("There should be no watchers for a non-existent collection!",
client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
expectThrows(TimeoutException.class, () -> {
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
});
waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
waitFor("Watchers for collection should be removed after timeout",
MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
}
@Test
// commented 20-July-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 09-Apr-2018
public void testDeletionsTriggerWatches() throws Exception {
CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1)
.process(cluster.getSolrClient());
Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (l, c) -> c == null);
Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(l, c) -> c == null);
CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient());
assertTrue("CollectionStateWatcher not notified of delete call after 30 seconds", future.get());
assertTrue("CollectionStateWatcher not notified of delete call", future.get());
}
@Test
public void testLiveNodeChangesTriggerWatches() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("test_collection", "config", 1, 1).process(client);
Future<Boolean> future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(l, c) -> (l.size() == 1 + CLUSTER_SIZE));
JettySolrRunner unusedJetty = cluster.startJettySolrRunner();
assertTrue("CollectionStateWatcher not notified of new node", future.get());
waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
future = waitInBackground("test_collection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(l, c) -> (l.size() == CLUSTER_SIZE));
cluster.stopJettySolrRunner(unusedJetty);
assertTrue("CollectionStateWatcher not notified of node lost", future.get());
waitFor("CollectionStateWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
}
@Test
//Commented 14-Oct-2018 @LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 12-Jun-2018
public void testWatchesWorkForStateFormat1() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
@ -291,13 +346,14 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", future.get());
assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation",
future.get());
Future<Boolean> migrated
= waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> c != null && c.getStateFormat() == 2);
CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT);
CollectionAdminRequest.migrateCollectionFormat("stateformat1")
.processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get());
}

View File

@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.common.cloud;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.util.ExecutorUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** @see TestCollectionStateWatchers */
public class TestDocCollectionWatcher extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int CLUSTER_SIZE = 4;
private static final int MAX_WAIT_TIMEOUT = 120; // seconds, only use for await -- NO SLEEP!!!
private ExecutorService executor = null;
@Before
public void prepareCluster() throws Exception {
configureCluster(CLUSTER_SIZE)
.addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
.configure();
executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers");
}
@After
public void tearDownCluster() throws Exception {
executor.shutdown();
shutdownCluster();
executor = null;
}
private Future<Boolean> waitInBackground(String collection, long timeout, TimeUnit unit,
Predicate<DocCollection> predicate) {
return executor.submit(() -> {
try {
cluster.getSolrClient().waitForState(collection, timeout, unit, predicate);
} catch (InterruptedException | TimeoutException e) {
return Boolean.FALSE;
}
return Boolean.TRUE;
});
}
private void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
throws InterruptedException, ExecutionException {
Future<Boolean> future = executor.submit(() -> {
try {
while (true) {
if (predicate.call())
return true;
TimeUnit.MILLISECONDS.sleep(10);
}
}
catch (InterruptedException e) {
return false;
}
});
try {
if (future.get(timeout, unit) == true) {
return;
}
}
catch (TimeoutException e) {
// pass failure message on
}
future.cancel(true);
fail(message);
}
@Test
public void testStateWatcherChecksCurrentStateOnRegister() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("currentstate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
final CountDownLatch latch = new CountDownLatch(1);
client.registerDocCollectionWatcher("currentstate", (c) -> {
latch.countDown();
return false;
});
assertTrue("DocCollectionWatcher isn't called on new registration",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertEquals("DocCollectionWatcher should be retained",
1, client.getZkStateReader().getStateWatchers("currentstate").size());
final CountDownLatch latch2 = new CountDownLatch(1);
client.registerDocCollectionWatcher("currentstate", (c) -> {
latch2.countDown();
return true;
});
assertTrue("DocCollectionWatcher isn't called when registering for already-watched collection",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
}
@Test
public void testWaitForStateChecksCurrentState() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("waitforstate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
// several goes, to check that we're not getting delayed state changes
for (int i = 0; i < 10; i++) {
try {
client.waitForState("waitforstate", 1, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
} catch (TimeoutException e) {
fail("waitForState should return immediately if the predicate is already satisfied");
}
}
}
@Test
public void testCanWaitForNonexistantCollection() throws Exception {
Future<Boolean> future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(c) -> (null != c));
CollectionAdminRequest.createCollection("delayed", "config", 1, 1)
.processAndWait(cluster.getSolrClient(), MAX_WAIT_TIMEOUT);
assertTrue("waitForState was not triggered by collection creation", future.get());
}
@Test
public void testPredicateFailureTimesOut() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
expectThrows(TimeoutException.class, () -> {
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS,
((liveNodes, collectionState) -> false));
});
waitFor("Watchers for collection should be removed after timeout",
MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
}
@Test
public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("falsepredicate", "config", 1, 1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
// create collection with 1 shard 1 replica...
client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
// set watcher waiting for at least 3 replicas (will fail initially)
final AtomicInteger runCount = new AtomicInteger(0);
final Future<Boolean> future = waitInBackground
("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(collectionState) -> {
runCount.incrementAndGet();
int replicas = 0;
for (Slice slice : collectionState) {
for (Replica replica : slice) {
replicas++;
}
}
return 3 <= replicas;
});
// add a 2nd replica...
CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 2));
// confirm watcher has run at least once and has been retained...
final int runCountSnapshot = runCount.get();
assertTrue(0 < runCountSnapshot);
assertEquals(1, client.getZkStateReader().getStateWatchers("falsepredicate").size());
// now add a 3rd replica...
CollectionAdminRequest.addReplicaToShard("falsepredicate", "shard1")
.processAndWait(client, MAX_WAIT_TIMEOUT);
// now confirm watcher is invoked & removed
assertTrue("watcher never succeeded", future.get());
assertTrue(runCountSnapshot < runCount.get());
waitFor("DocCollectionWatcher should be removed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("falsepredicate").size() == 0);
}
@Test
public void testWatcherIsRemovedAfterTimeout() throws Exception {
CloudSolrClient client = cluster.getSolrClient();
assertTrue("There should be no watchers for a non-existent collection!",
client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
expectThrows(TimeoutException.class, () -> {
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS,
(c) -> (false));
});
waitFor("Watchers for collection should be removed after timeout",
MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
() -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
}
@Test
public void testDeletionsTriggerWatches() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
CollectionAdminRequest.createCollection("tobedeleted", "config", 1, 1).process(client);
client.waitForState("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
Future<Boolean> future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(c) -> c == null);
CollectionAdminRequest.deleteCollection("tobedeleted").process(client);
assertTrue("DocCollectionWatcher not notified of delete call", future.get());
}
@Test
public void testWatchesWorkForStateFormat1() throws Exception {
final CloudSolrClient client = cluster.getSolrClient();
Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(c) -> (null != c) );
CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
.processAndWait(client, MAX_WAIT_TIMEOUT);
client.waitForState("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
assertTrue("DocCollectionWatcher not notified of stateformat=1 collection creation",
future.get());
Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
(c) -> c != null && c.getStateFormat() == 2);
CollectionAdminRequest.migrateCollectionFormat("stateformat1")
.processAndWait(client, MAX_WAIT_TIMEOUT);
assertTrue("DocCollectionWatcher did not persist over state format migration", migrated.get());
}
}

View File

@ -205,7 +205,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
throws Exception {
log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (liveNodes, docCollection) -> {
zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> {
if (docCollection == null)
return true;
return false;
@ -237,7 +237,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
Thread.sleep(100);
}
zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (liveNodes, docCollection) -> {
zkStateReader.waitForState("collection1", timeOut.timeLeft(SECONDS), TimeUnit.SECONDS, (docCollection) -> {
if (docCollection == null)
return false;
@ -253,7 +253,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
public static void verifyReplicaStatus(ZkStateReader reader, String collection, String shard, String coreNodeName,
Replica.State expectedState) throws InterruptedException, TimeoutException {
reader.waitForState(collection, 15000, TimeUnit.MILLISECONDS,
(liveNodes, collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
(collectionState) -> collectionState != null && collectionState.getSlice(shard) != null
&& collectionState.getSlice(shard).getReplicasMap().get(coreNodeName) != null
&& collectionState.getSlice(shard).getReplicasMap().get(coreNodeName).getState() == expectedState);
}

View File

@ -405,7 +405,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
.setCreateNodeSet("")
.process(cloudClient).getStatus());
cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (l,c) -> c != null && c.getSlices().size() == sliceCount);
cloudClient.waitForState(DEFAULT_COLLECTION, 30, TimeUnit.SECONDS, (c) -> c != null && c.getSlices().size() == sliceCount);
ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("closeThreadPool"));
@ -585,7 +585,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
protected void waitForActiveReplicaCount(CloudSolrClient client, String collection, int expectedNumReplicas) throws TimeoutException, NotInClusterStateException {
AtomicInteger nReplicas = new AtomicInteger();
try {
client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (n, c) -> {
client.getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (c) -> {
if (c == null)
return false;
int numReplicas = getTotalReplicas(c, c.getName());

View File

@ -553,7 +553,7 @@ public class MiniSolrCloudCluster {
}
for (String collection : reader.getClusterState().getCollectionStates().keySet()) {
reader.waitForState(collection, 15, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState == null ? true : false);
reader.waitForState(collection, 15, TimeUnit.SECONDS, (collectionState) -> collectionState == null ? true : false);
}
}