diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 925f9d4f9c6..f55e08d5003 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -111,6 +111,8 @@ New Features * SOLR-8208: [subquery] document transformer executes separate requests per result document. (Cao Manh Dat via Mikhail Khludnev) +* SOLR-8323: All CollectionStateWatcher API (Alan Woodward, Scott Blum) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index ae73633cf33..603f849efda 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1215,23 +1215,10 @@ public final class ZkController { if (context != null) { context.cancelElection(); } - - final Collection cores = cc.getCores(); - - // if there is no SolrCore which is a member of this collection, remove the watch + CloudDescriptor cloudDescriptor = cd.getCloudDescriptor(); - boolean removeWatch = true; - for (SolrCore solrCore : cores) { - final CloudDescriptor cloudDesc = solrCore.getCoreDescriptor().getCloudDescriptor(); - if (cloudDesc != null && cloudDescriptor.getCollectionName().equals(cloudDesc.getCollectionName())) { - removeWatch = false; - break; - } - } - - if (removeWatch) { - zkStateReader.removeZKWatch(collection); - } + zkStateReader.unregisterCore(cloudDescriptor.getCollectionName()); + ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName, ZkStateReader.NODE_NAME_PROP, getNodeName(), @@ -1481,7 +1468,7 @@ public final class ZkController { "Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" : "Registering watch for collection {}", collectionName); - zkStateReader.addCollectionWatch(collectionName); + zkStateReader.registerCore(collectionName); } catch (KeeperException e) { log.error("", e); throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e); diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java index 47d094c55f8..ddfda9d1fa8 100644 --- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java @@ -63,6 +63,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { ZkTestServer server = new ZkTestServer(zkDir); SolrZkClient zkClient = null; + ZkStateReader reader = null; try { server.run(); @@ -72,10 +73,10 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); ZkController.createClusterZkNodes(zkClient); - ZkStateReader reader = new ZkStateReader(zkClient); + reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); if (isInteresting) { - reader.addCollectionWatch("c1"); + reader.registerCore("c1"); } ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); @@ -137,7 +138,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { assertEquals(2, collection.getStateFormat()); } } finally { - IOUtils.close(zkClient); + IOUtils.close(reader, zkClient); server.shutdown(); } @@ -147,6 +148,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { String zkDir = createTempDir("testExternalCollectionWatchedNotWatched").toFile().getAbsolutePath(); ZkTestServer server = new ZkTestServer(zkDir); SolrZkClient zkClient = null; + ZkStateReader reader = null; try { server.run(); @@ -156,7 +158,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); ZkController.createClusterZkNodes(zkClient); - ZkStateReader reader = new ZkStateReader(zkClient); + reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats()); @@ -171,13 +173,13 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { reader.forceUpdateCollection("c1"); assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); - reader.addCollectionWatch("c1"); + reader.registerCore("c1"); assertFalse(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); - reader.removeZKWatch("c1"); + reader.unregisterCore("c1"); assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded()); } finally { - IOUtils.close(zkClient); + IOUtils.close(reader, zkClient); server.shutdown(); } } @@ -188,6 +190,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { ZkTestServer server = new ZkTestServer(zkDir); SolrZkClient zkClient = null; + ZkStateReader reader = null; try { server.run(); @@ -197,9 +200,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT); ZkController.createClusterZkNodes(zkClient); - ZkStateReader reader = new ZkStateReader(zkClient); + reader = new ZkStateReader(zkClient); reader.createClusterStateWatchersAndUpdate(); - reader.addCollectionWatch("c1"); + reader.registerCore("c1"); // Initially there should be no c1 collection. assertNull(reader.getClusterState().getCollectionRef("c1")); @@ -235,7 +238,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 { assertFalse(ref.isLazilyLoaded()); assertEquals(2, ref.get().getStateFormat()); } finally { - IOUtils.close(zkClient); + IOUtils.close(reader, zkClient); server.shutdown(); } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java index f6233b8097f..e8a57d73d07 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java @@ -56,6 +56,8 @@ import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.ToleratedUpdateError; import org.apache.solr.common.cloud.Aliases; import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.CollectionStatePredicate; +import org.apache.solr.common.cloud.CollectionStateWatcher; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.DocRouter; import org.apache.solr.common.cloud.ImplicitDocRouter; @@ -572,6 +574,40 @@ public class CloudSolrClient extends SolrClient { zkStateReader.getConfigManager().downloadConfigDir(configName, downloadPath); } + /** + * Block until a collection state matches a predicate, or a timeout + * + * Note that the predicate may be called again even after it has returned true, so + * implementors should avoid changing state within the predicate call itself. + * + * @param collection the collection to watch + * @param wait how long to wait + * @param unit the units of the wait parameter + * @param predicate a {@link CollectionStatePredicate} to check the collection state + * @throws InterruptedException on interrupt + * @throws TimeoutException on timeout + */ + public void waitForState(String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate) + throws InterruptedException, TimeoutException { + connect(); + zkStateReader.waitForState(collection, wait, unit, predicate); + } + + /** + * Register a CollectionStateWatcher to be called when the cluster state for a collection changes + * + * Note that the watcher is unregistered after it has been called once. To make a watcher persistent, + * it should re-register itself in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} + * call + * + * @param collection the collection to watch + * @param watcher a watcher that will be called when the state changes + */ + public void registerCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { + connect(); + zkStateReader.registerCollectionStateWatcher(collection, watcher); + } + private NamedList directUpdate(AbstractUpdateRequest request, String collection, ClusterState clusterState) throws SolrServerException { UpdateRequest updateRequest = (UpdateRequest) request; ModifiableSolrParams params = (ModifiableSolrParams) request.getParams(); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java new file mode 100644 index 00000000000..0b0a28eeed0 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStatePredicate.java @@ -0,0 +1,42 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Interface to determine if a collection state matches a required state + * + * @see ZkStateReader#waitForState(String, long, TimeUnit, CollectionStatePredicate) + */ +public interface CollectionStatePredicate { + + /** + * Check the collection state matches a required state + * + * Note that both liveNodes and collectionState should be consulted to determine + * the overall state. + * + * @param liveNodes the current set of live nodes + * @param collectionState the latest collection state, or null if the collection + * does not exist + */ + boolean matches(Set liveNodes, DocCollection collectionState); + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java new file mode 100644 index 00000000000..0bf66b012e8 --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/CollectionStateWatcher.java @@ -0,0 +1,42 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +import java.util.Set; + +/** + * Callback registered with {@link ZkStateReader#registerCollectionStateWatcher(String, CollectionStateWatcher)} + * and called whenever the collection state changes. + */ +public interface CollectionStateWatcher { + + /** + * Called when the collection we are registered against has a change of state + * + * Note that, due to the way Zookeeper watchers are implemented, a single call may be + * the result of several state changes + * + * A watcher is unregistered after it has been called once. To make a watcher persistent, + * implementors should re-register during this call. + * + * @param liveNodes the set of live nodes + * @param collectionState the new collection state + */ + void onStateChanged(Set liveNodes, DocCollection collectionState); + +} diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index d30a73fa145..405d69d0077 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -22,6 +22,8 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; @@ -35,7 +37,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; /** * Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection") */ -public class DocCollection extends ZkNodeProps { +public class DocCollection extends ZkNodeProps implements Iterable { + public static final String DOC_ROUTER = "router"; public static final String SHARDS = "shards"; public static final String STATE_FORMAT = "stateFormat"; @@ -217,4 +220,27 @@ public class DocCollection extends ZkNodeProps { if (slice == null) return null; return slice.getLeader(); } + + /** + * Check that all replicas in a collection are live + * + * @see CollectionStatePredicate + */ + public static boolean isFullyActive(Set liveNodes, DocCollection collectionState) { + Objects.requireNonNull(liveNodes); + if (collectionState == null) + return false; + for (Slice slice : collectionState) { + for (Replica replica : slice) { + if (replica.isActive(liveNodes) == false) + return false; + } + } + return true; + } + + @Override + public Iterator iterator() { + return slices.values().iterator(); + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index 3a31d195658..7015dfbfdd5 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -16,13 +16,15 @@ */ package org.apache.solr.common.cloud; -import static org.apache.solr.common.cloud.ZkStateReader.*; - import java.util.Locale; import java.util.Map; +import java.util.Set; import org.noggit.JSONUtil; +import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP; +import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; + public class Replica extends ZkNodeProps { /** @@ -116,6 +118,10 @@ public class Replica extends ZkNodeProps { return state; } + public boolean isActive(Set liveNodes) { + return liveNodes.contains(this.nodeName) && this.state == State.ACTIVE; + } + @Override public String toString() { return name + ':' + JSONUtil.toJSON(propMap, -1); // small enough, keep it on one line (i.e. no indent) diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java index 163561a996a..3ace17a6cd3 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java @@ -19,6 +19,7 @@ package org.apache.solr.common.cloud; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Locale; import java.util.Map; @@ -29,24 +30,29 @@ import org.noggit.JSONWriter; /** * A Slice contains immutable information about a logical shard (all replicas that share the same shard id). */ -public class Slice extends ZkNodeProps { +public class Slice extends ZkNodeProps implements Iterable { /** Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */ public static Map loadAllFromMap(Map genericSlices) { if (genericSlices == null) return Collections.emptyMap(); - Map result = new LinkedHashMap<>(genericSlices.size()); - for (Map.Entry entry : genericSlices.entrySet()) { + Map result = new LinkedHashMap<>(genericSlices.size()); + for (Map.Entry entry : genericSlices.entrySet()) { String name = entry.getKey(); Object val = entry.getValue(); if (val instanceof Slice) { - result.put(name, (Slice)val); + result.put(name, (Slice) val); } else if (val instanceof Map) { - result.put(name, new Slice(name, null, (Map)val)); + result.put(name, new Slice(name, null, (Map) val)); } } return result; } + @Override + public Iterator iterator() { + return replicas.values().iterator(); + } + /** The slice's state. */ public enum State { diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index ab031b0ab21..8a0ad1db0e4 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -28,15 +28,22 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.solr.common.Callable; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.Utils; import org.apache.zookeeper.CreateMode; @@ -111,12 +118,10 @@ public class ZkStateReader implements Closeable { public static final String SHARD_LEADERS_ZKNODE = "leaders"; public static final String ELECTION_NODE = "election"; - - /** Collections we actively care about, and will try to keep watch on. */ - private final Set interestingCollections = Collections.newSetFromMap(new ConcurrentHashMap<>()); - + /** Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json. */ private Map legacyCollectionStates = emptyMap(); + /** Last seen ZK version of clusterstate.json. */ private int legacyClusterStateVersion = 0; @@ -134,6 +139,21 @@ public class ZkStateReader implements Closeable { private final Runnable securityNodeListener; + private ConcurrentHashMap collectionWatches = new ConcurrentHashMap<>(); + + private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches"); + + private class CollectionWatch { + + int coreRefCount = 0; + Set stateWatchers = new HashSet<>(); + + public boolean canBeRemoved() { + return coreRefCount + stateWatchers.size() == 0; + } + + } + public static final Set KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList( LEGACY_CLOUD, URL_SCHEME, @@ -262,6 +282,7 @@ public class ZkStateReader implements Closeable { * a better design is possible. */ public void forceUpdateCollection(String collection) throws KeeperException, InterruptedException { + synchronized (getUpdateLock()) { if (clusterState == null) { return; @@ -295,6 +316,7 @@ public class ZkStateReader implements Closeable { } constructState(); } + } /** Refresh the set of live nodes. */ @@ -341,10 +363,10 @@ public class ZkStateReader implements Closeable { } // on reconnect of SolrZkClient force refresh and re-add watches. + refreshLiveNodes(new LiveNodeWatcher()); refreshLegacyClusterState(new LegacyClusterStateWatcher()); refreshStateFormat2Collections(); refreshCollectionList(new CollectionsChildWatcher()); - refreshLiveNodes(new LiveNodeWatcher()); synchronized (ZkStateReader.this.getUpdateLock()) { constructState(); @@ -458,7 +480,7 @@ public class ZkStateReader implements Closeable { this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion); LOG.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]", legacyCollectionStates.keySet().size(), - interestingCollections.size(), + collectionWatches.keySet().size(), watchedCollectionStates.keySet().size(), lazyCollectionStates.keySet().size(), clusterState.getCollectionStates().size()); @@ -466,7 +488,7 @@ public class ZkStateReader implements Closeable { if (LOG.isTraceEnabled()) { LOG.trace("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]", legacyCollectionStates.keySet(), - interestingCollections, + collectionWatches.keySet(), watchedCollectionStates.keySet(), lazyCollectionStates.keySet(), clusterState.getCollectionStates()); @@ -476,8 +498,7 @@ public class ZkStateReader implements Closeable { /** * Refresh legacy (shared) clusterstate.json */ - private void refreshLegacyClusterState(Watcher watcher) - throws KeeperException, InterruptedException { + private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, InterruptedException { try { final Stat stat = new Stat(); final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true); @@ -487,6 +508,20 @@ public class ZkStateReader implements Closeable { // Nothing to do, someone else updated same or newer. return; } + Set liveNodes = this.liveNodes; // volatile read + for (Map.Entry watchEntry : this.collectionWatches.entrySet()) { + String coll = watchEntry.getKey(); + CollectionWatch collWatch = watchEntry.getValue(); + ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll); + if (ref == null) + continue; + // legacy collections are always in-memory + DocCollection newState = ref.get(); + if (!collWatch.stateWatchers.isEmpty() + && !Objects.equals(loadedData.getCollectionStates().get(coll).get(), newState)) { + notifyStateWatchers(liveNodes, coll, newState); + } + } this.legacyCollectionStates = loadedData.getCollectionStates(); this.legacyClusterStateVersion = stat.getVersion(); } @@ -503,9 +538,8 @@ public class ZkStateReader implements Closeable { * Refresh state format2 collections. */ private void refreshStateFormat2Collections() { - // It's okay if no format2 state.json exists, if one did not previous exist. - for (String coll : interestingCollections) { - new StateWatcher(coll).refreshAndWatch(watchedCollectionStates.containsKey(coll)); + for (String coll : collectionWatches.keySet()) { + new StateWatcher(coll).refreshAndWatch(); } } @@ -546,7 +580,7 @@ public class ZkStateReader implements Closeable { this.lazyCollectionStates.keySet().retainAll(children); for (String coll : children) { // We will create an eager collection for any interesting collections, so don't add to lazy. - if (!interestingCollections.contains(coll)) { + if (!collectionWatches.containsKey(coll)) { // Double check contains just to avoid allocating an object. LazyCollectionRef existing = lazyCollectionStates.get(coll); if (existing == null) { @@ -637,6 +671,7 @@ public class ZkStateReader implements Closeable { public void close() { this.closed = true; + notifications.shutdown(); if (closeClient) { zkClient.close(); } @@ -888,7 +923,7 @@ public class ZkStateReader implements Closeable { return; } - if (!interestingCollections.contains(coll)) { + if (!collectionWatches.containsKey(coll)) { // This collection is no longer interesting, stop watching. LOG.info("Uninteresting collection {}", coll); return; @@ -899,27 +934,22 @@ public class ZkStateReader implements Closeable { LOG.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])", event, coll, liveNodesSize); - refreshAndWatch(true); + refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } + } /** * Refresh collection state from ZK and leave a watch for future changes. * As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates} * with the results of the refresh. - * - * @param expectExists if true, error if no state node exists */ - public void refreshAndWatch(boolean expectExists) { + public void refreshAndWatch() { try { DocCollection newState = fetchCollectionState(coll, this); updateWatchedCollection(coll, newState); - } catch (KeeperException.NoNodeException e) { - if (expectExists) { - LOG.warn("State node vanished for collection: [{}]", coll, e); - } } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); } catch (KeeperException e) { @@ -1071,32 +1101,190 @@ public class ZkStateReader implements Closeable { return COLLECTIONS_ZKNODE+"/"+coll + "/state.json"; } - public void addCollectionWatch(String coll) { - if (interestingCollections.add(coll)) { - LOG.info("addZkWatch [{}]", coll); - new StateWatcher(coll).refreshAndWatch(false); + /** + * Notify this reader that a local Core is a member of a collection, and so that collection + * state should be watched. + * + * Not a public API. This method should only be called from ZkController. + * + * The number of cores per-collection is tracked, and adding multiple cores from the same + * collection does not increase the number of watches. + * + * @param collection the collection that the core is a member of + * + * @see ZkStateReader#unregisterCore(String) + */ + public void registerCore(String collection) { + AtomicBoolean reconstructState = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) { + reconstructState.set(true); + v = new CollectionWatch(); + } + v.coreRefCount++; + return v; + }); + if (reconstructState.get()) { + new StateWatcher(collection).refreshAndWatch(); synchronized (getUpdateLock()) { constructState(); } } } + /** + * Notify this reader that a local core that is a member of a collection has been closed. + * + * Not a public API. This method should only be called from ZkController. + * + * If no cores are registered for a collection, and there are no {@link CollectionStateWatcher}s + * for that collection either, the collection watch will be removed. + * + * @param collection the collection that the core belongs to + */ + public void unregisterCore(String collection) { + AtomicBoolean reconstructState = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + if (v.coreRefCount > 0) + v.coreRefCount--; + if (v.canBeRemoved()) { + watchedCollectionStates.remove(collection); + lazyCollectionStates.put(collection, new LazyCollectionRef(collection)); + reconstructState.set(true); + return null; + } + return v; + }); + if (reconstructState.get()) { + synchronized (getUpdateLock()) { + constructState(); + } + } + } + + /** + * Register a CollectionStateWatcher to be called when the state of a collection changes + * + * A given CollectionStateWatcher will be only called once. If you want to have a persistent watcher, + * it should register itself again in its {@link CollectionStateWatcher#onStateChanged(Set, DocCollection)} + * method. + */ + public void registerCollectionStateWatcher(String collection, CollectionStateWatcher stateWatcher) { + AtomicBoolean watchSet = new AtomicBoolean(false); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) { + v = new CollectionWatch(); + watchSet.set(true); + } + v.stateWatchers.add(stateWatcher); + return v; + }); + if (watchSet.get()) { + new StateWatcher(collection).refreshAndWatch(); + synchronized (getUpdateLock()) { + constructState(); + } + } + } + + /** + * Block until a CollectionStatePredicate returns true, or the wait times out + * + * Note that the predicate may be called again even after it has returned true, so + * implementors should avoid changing state within the predicate call itself. + * + * @param collection the collection to watch + * @param wait how long to wait + * @param unit the units of the wait parameter + * @param predicate the predicate to call on state changes + * @throws InterruptedException on interrupt + * @throws TimeoutException on timeout + */ + public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate) + throws InterruptedException, TimeoutException { + + final CountDownLatch latch = new CountDownLatch(1); + + CollectionStateWatcher watcher = new CollectionStateWatcher() { + @Override + public void onStateChanged(Set liveNodes, DocCollection collectionState) { + if (predicate.matches(liveNodes, collectionState)) { + latch.countDown(); + } else { + registerCollectionStateWatcher(collection, this); + } + } + }; + registerCollectionStateWatcher(collection, watcher); + + try { + // check the current state + DocCollection dc = clusterState.getCollectionOrNull(collection); + if (predicate.matches(liveNodes, dc)) + return; + + // wait for the watcher predicate to return true, or time out + if (!latch.await(wait, unit)) + throw new TimeoutException(); + + } + finally { + removeCollectionStateWatcher(collection, watcher); + } + } + + /** + * Remove a watcher from a collection's watch list. + * + * This allows Zookeeper watches to be removed if there is no interest in the + * collection. + * + * @param collection the collection + * @param watcher the watcher + */ + public void removeCollectionStateWatcher(String collection, CollectionStateWatcher watcher) { + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + v.stateWatchers.remove(watcher); + if (v.canBeRemoved()) + return null; + return v; + }); + } + + /* package-private for testing */ + Set getStateWatchers(String collection) { + CollectionWatch watch = collectionWatches.get(collection); + if (watch == null) + return null; + return new HashSet<>(watch.stateWatchers); + } + + // returns true if the state has changed private void updateWatchedCollection(String coll, DocCollection newState) { + + Set liveNodes = this.liveNodes; // volatile read + if (newState == null) { LOG.info("Deleting data for [{}]", coll); watchedCollectionStates.remove(coll); + notifyStateWatchers(liveNodes, coll, null); return; } // CAS update loop while (true) { - if (!interestingCollections.contains(coll)) { + if (!collectionWatches.containsKey(coll)) { break; } DocCollection oldState = watchedCollectionStates.get(coll); if (oldState == null) { if (watchedCollectionStates.putIfAbsent(coll, newState) == null) { LOG.info("Add data for [{}] ver [{}]", coll, newState.getZNodeVersion()); + notifyStateWatchers(liveNodes, coll, newState); break; } } else { @@ -1106,27 +1294,18 @@ public class ZkStateReader implements Closeable { } if (watchedCollectionStates.replace(coll, oldState, newState)) { LOG.info("Updating data for [{}] from [{}] to [{}]", coll, oldState.getZNodeVersion(), newState.getZNodeVersion()); + notifyStateWatchers(liveNodes, coll, newState); break; } } } - // Resolve race with removeZKWatch. - if (!interestingCollections.contains(coll)) { + // Resolve race with unregisterCore. + if (!collectionWatches.containsKey(coll)) { watchedCollectionStates.remove(coll); LOG.info("Removing uninteresting collection [{}]", coll); } - } - - /** This is not a public API. Only used by ZkController */ - public void removeZKWatch(String coll) { - LOG.info("Removing watch for uninteresting collection [{}]", coll); - interestingCollections.remove(coll); - watchedCollectionStates.remove(coll); - lazyCollectionStates.put(coll, new LazyCollectionRef(coll)); - synchronized (getUpdateLock()) { - constructState(); - } + } public static class ConfigData { @@ -1142,4 +1321,45 @@ public class ZkStateReader implements Closeable { } } + + private void notifyStateWatchers(Set liveNodes, String collection, DocCollection collectionState) { + try { + notifications.submit(new Notification(liveNodes, collection, collectionState)); + } + catch (RejectedExecutionException e) { + if (closed == false) { + LOG.error("Couldn't run collection notifications for {}", collection, e); + } + } + } + + private class Notification implements Runnable { + + final Set liveNodes; + final String collection; + final DocCollection collectionState; + + private Notification(Set liveNodes, String collection, DocCollection collectionState) { + this.liveNodes = liveNodes; + this.collection = collection; + this.collectionState = collectionState; + } + + @Override + public void run() { + List watchers = new ArrayList<>(); + collectionWatches.compute(collection, (k, v) -> { + if (v == null) + return null; + watchers.addAll(v.stateWatchers); + v.stateWatchers.clear(); + return v; + }); + for (CollectionStateWatcher watcher : watchers) { + watcher.onStateChanged(liveNodes, collectionState); + } + } + + } + } diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index b8d9ac41b0a..5f307a8bcc0 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -19,16 +19,9 @@ package org.apache.solr.common.util; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collection; - -import java.util.Enumeration; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; @@ -153,6 +146,13 @@ public class ExecutorUtil { threadFactory); } + /** + * Create a cached thread pool using a named thread factory + */ + public static ExecutorService newMDCAwareCachedThreadPool(String name) { + return newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(name)); + } + /** * See {@link java.util.concurrent.Executors#newCachedThreadPool(ThreadFactory)} */ diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java new file mode 100644 index 00000000000..2862f08525d --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -0,0 +1,238 @@ +package org.apache.solr.common.cloud; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.util.ExecutorUtil; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.hamcrest.core.Is.is; + +public class TestCollectionStateWatchers extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int CLUSTER_SIZE = 4; + + private static final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("backgroundWatchers"); + + private static final int MAX_WAIT_TIMEOUT = 30; + + @BeforeClass + public static void startCluster() throws Exception { + configureCluster(CLUSTER_SIZE) + .addConfig("config", getFile("solrj/solr/collection1/conf").toPath()) + .configure(); + } + + @AfterClass + public static void shutdownBackgroundExecutors() { + executor.shutdown(); + } + + @Before + public void prepareCluster() throws Exception { + int missingServers = CLUSTER_SIZE - cluster.getJettySolrRunners().size(); + for (int i = 0; i < missingServers; i++) { + cluster.startJettySolrRunner(); + } + cluster.waitForAllNodes(30); + } + + private static Future waitInBackground(String collection, long timeout, TimeUnit unit, + CollectionStatePredicate predicate) { + return executor.submit(() -> { + try { + cluster.getSolrClient().waitForState(collection, timeout, unit, predicate); + } catch (InterruptedException | TimeoutException e) { + return Boolean.FALSE; + } + return Boolean.TRUE; + }); + } + + + @Test + public void testSimpleCollectionWatch() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + cluster.createCollection("testcollection", CLUSTER_SIZE, 1, "config", new HashMap<>()); + + client.waitForState("testcollection", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + + // shutdown a node and check that we get notified about the change + final AtomicInteger nodeCount = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(1); + client.registerCollectionStateWatcher("testcollection", (liveNodes, collectionState) -> { + // we can't just count liveNodes here, because that's updated by a separate watcher, + // and it may be the case that we're triggered by a node setting itself to DOWN before + // the liveNodes watcher is called + log.info("State changed: {}", collectionState); + for (Slice slice : collectionState) { + for (Replica replica : slice) { + if (replica.isActive(liveNodes)) + nodeCount.incrementAndGet(); + } + } + latch.countDown(); + }); + + cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); + assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + + assertThat(nodeCount.intValue(), is(3)); + + } + + @Test + public void testWaitForStateChecksCurrentState() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + cluster.createCollection("waitforstate", 1, 1, "config", new HashMap<>()); + + client.waitForState("waitforstate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + + // several goes, to check that we're not getting delayed state changes + for (int i = 0; i < 10; i++) { + try { + client.waitForState("waitforstate", 1, TimeUnit.SECONDS, DocCollection::isFullyActive); + } + catch (TimeoutException e) { + fail("waitForState should return immediately if the predicate is already satisfied"); + } + } + + } + + @Test + public void testCanWaitForNonexistantCollection() throws Exception { + + Future future = waitInBackground("delayed", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + cluster.createCollection("delayed", 1, 1, "config", new HashMap<>()); + assertTrue("waitForState was not triggered by collection creation", future.get()); + + } + + @Test + public void testPredicateFailureTimesOut() throws Exception { + CloudSolrClient client = cluster.getSolrClient(); + expectThrows(TimeoutException.class, () -> { + client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false)); + }); + Set watchers = client.getZkStateReader().getStateWatchers("nosuchcollection"); + assertTrue("Watchers for collection should be removed after timeout", + watchers == null || watchers.size() == 0); + + } + + @Test + public void testWaitForStateWatcherIsRetainedOnPredicateFailure() throws Exception { + + CloudSolrClient client = cluster.getSolrClient(); + cluster.createCollection("falsepredicate", 4, 1, "config", new HashMap<>()); + client.waitForState("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + + final CountDownLatch firstCall = new CountDownLatch(1); + + // stop a node, then add a watch waiting for all nodes to be back up + JettySolrRunner node1 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); + + Future future = waitInBackground("falsepredicate", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (liveNodes, collectionState) -> { + firstCall.countDown(); + return DocCollection.isFullyActive(liveNodes, collectionState); + }); + + // first, stop another node; the watch should not be fired after this! + JettySolrRunner node2 = cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); + + // now start them both back up + cluster.startJettySolrRunner(node1); + assertTrue("CollectionStateWatcher not called after 30 seconds", firstCall.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); + cluster.startJettySolrRunner(node2); + + Boolean result = future.get(); + assertTrue("Did not see a fully active cluster after 30 seconds", result); + + } + + @Test + public void testWatcherIsRemovedAfterTimeout() { + CloudSolrClient client = cluster.getSolrClient(); + assertTrue("There should be no watchers for a non-existent collection!", + client.getZkStateReader().getStateWatchers("no-such-collection") == null); + + expectThrows(TimeoutException.class, () -> { + client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, DocCollection::isFullyActive); + }); + + Set watchers = client.getZkStateReader().getStateWatchers("no-such-collection"); + assertTrue("Watchers for collection should be removed after timeout", + watchers == null || watchers.size() == 0); + + } + + @Test + public void testDeletionsTriggerWatches() throws Exception { + cluster.createCollection("tobedeleted", 1, 1, "config", new HashMap<>()); + Future future = waitInBackground("tobedeleted", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, (l, c) -> c == null); + + CollectionAdminRequest.deleteCollection("tobedeleted").process(cluster.getSolrClient()); + + assertTrue("CollectionStateWatcher not notified of delete call after 30 seconds", future.get()); + } + + @Test + public void testWatchesWorkForStateFormat1() throws Exception { + + final CloudSolrClient client = cluster.getSolrClient(); + + Future future + = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, DocCollection::isFullyActive); + + CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1) + .processAndWait(client, MAX_WAIT_TIMEOUT); + assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation", future.get()); + + Future migrated + = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> c != null && c.getStateFormat() == 2); + + CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT); + assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get()); + + } + +} diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java index d23b37cd4b8..f51116246eb 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.SortedMap; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; @@ -181,8 +182,8 @@ public class MiniSolrCloudCluster { */ public MiniSolrCloudCluster(int numServers, Path baseDir, String solrXml, JettyConfig jettyConfig, ZkTestServer zkTestServer) throws Exception { - this.baseDir = baseDir; - this.jettyConfig = jettyConfig; + this.baseDir = Objects.requireNonNull(baseDir); + this.jettyConfig = Objects.requireNonNull(jettyConfig); Files.createDirectories(baseDir); @@ -194,8 +195,7 @@ public class MiniSolrCloudCluster { } this.zkServer = zkTestServer; - try(SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), - AbstractZkTestCase.TIMEOUT, AbstractZkTestCase.TIMEOUT, null)) { + try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) { zkClient.makePath("/solr/solr.xml", solrXml.getBytes(Charset.defaultCharset()), true); if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) { zkClient.makePath("/solr" + ZkStateReader.CLUSTER_PROPS, "{'urlScheme':'https'}".getBytes(Charsets.UTF_8), true); @@ -222,12 +222,17 @@ public class MiniSolrCloudCluster { throw startupError; } - try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), - AbstractZkTestCase.TIMEOUT, 45000, null)) { + waitForAllNodes(numServers, 60); + + solrClient = buildSolrClient(); + } + + private void waitForAllNodes(int numServers, int timeout) throws IOException, InterruptedException { + try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) { int numliveNodes = 0; - int retries = 60; + int retries = timeout; String liveNodesPath = "/solr/live_nodes"; - // Wait up to 60 seconds for number of live_nodes to match up number of servers + // Wait up to {timeout} seconds for number of live_nodes to match up number of servers do { if (zkClient.exists(liveNodesPath, true)) { numliveNodes = zkClient.getChildren(liveNodesPath, null, true).size(); @@ -244,8 +249,13 @@ public class MiniSolrCloudCluster { Thread.sleep(1000); } while (numliveNodes != numServers); } + catch (KeeperException e) { + throw new IOException("Error communicating with zookeeper", e); + } + } - solrClient = buildSolrClient(); + public void waitForAllNodes(int timeout) throws IOException, InterruptedException { + waitForAllNodes(jettys.size(), timeout); } private String newNodeName() { @@ -348,7 +358,13 @@ public class MiniSolrCloudCluster { return jetty; } - protected JettySolrRunner startJettySolrRunner(JettySolrRunner jetty) throws Exception { + /** + * Add a previously stopped node back to the cluster + * @param jetty a {@link JettySolrRunner} previously returned by {@link #stopJettySolrRunner(int)} + * @return the started node + * @throws Exception on error + */ + public JettySolrRunner startJettySolrRunner(JettySolrRunner jetty) throws Exception { jetty.start(); jettys.add(jetty); return jetty;