From 40c5d6b750637eab6fad3e30715a224423b3a56e Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Wed, 3 Feb 2021 14:40:07 -0600 Subject: [PATCH] SOLR-14253 Avoid writes in ZKSR.waitForState (#2297) --- .../org/apache/solr/cloud/ZkController.java | 26 ++++++----------- .../api/collections/DeleteCollectionCmd.java | 2 +- .../OverseerCollectionMessageHandler.java | 29 +++++++------------ .../solr/common/cloud/ZkStateReader.java | 6 ++-- 4 files changed, 24 insertions(+), 39 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 8a958d587ff..f5d69289e5a 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1698,12 +1698,11 @@ public class ZkController implements Closeable { private void waitForCoreNodeName(CoreDescriptor descriptor) { log.debug("waitForCoreNodeName >>> look for our core node name"); try { - zkStateReader.waitForState(descriptor.getCollectionName(), 320L, TimeUnit.SECONDS, c -> { - String name = ClusterStateMutator.getAssignedCoreNodeName(c, getNodeName(), descriptor.getName()); - if (name == null) return false; - descriptor.getCloudDescriptor().setCoreNodeName(name); - return true; - }); + DocCollection collection = zkStateReader.waitForState(descriptor.getCollectionName(), 320L, TimeUnit.SECONDS, + c -> ClusterStateMutator.getAssignedCoreNodeName(c, getNodeName(), descriptor.getName()) != null); + // Read outside of the predicate to avoid multiple potential writes + String name = ClusterStateMutator.getAssignedCoreNodeName(collection, getNodeName(), descriptor.getName()); + descriptor.getCloudDescriptor().setCoreNodeName(name); } catch (TimeoutException | InterruptedException e) { SolrZkClient.checkInterrupted(e); throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for collection state", e); @@ -1716,15 +1715,10 @@ public class ZkController implements Closeable { log.debug("waiting to find shard id in clusterstate for {}", cd.getName()); } try { - zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, c -> { - if (c == null) return false; - final String shardId = c.getShardId(getNodeName(), cd.getName()); - if (shardId != null) { - cd.getCloudDescriptor().setShardId(shardId); - return true; - } - return false; - }); + DocCollection collection = zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, + c -> c != null && c.getShardId(getNodeName(), cd.getName()) != null); + // Read outside of the predicate to avoid multiple potential writes + cd.getCloudDescriptor().setShardId(collection.getShardId(getNodeName(), cd.getName())); } catch (TimeoutException | InterruptedException e) { SolrZkClient.checkInterrupted(e); throw new SolrException(ErrorCode.SERVER_ERROR, "Failed getting shard id for core: " + cd.getName(), e); @@ -1814,10 +1808,8 @@ public class ZkController implements Closeable { } AtomicReference errorMessage = new AtomicReference<>(); - AtomicReference collectionState = new AtomicReference<>(); try { zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> { - collectionState.set(c); if (c == null) return false; Slice slice = c.getSlice(cloudDesc.getShardId()); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java index 1e8ec09315e..3b7eb93d5dd 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java @@ -153,7 +153,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd ocmh.overseer.offerStateUpdate(Utils.toJSON(m)); // wait for a while until we don't see the collection - zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, (collectionState) -> collectionState == null); + zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, Objects::isNull); // we can delete any remaining unique aliases if (!aliasReferences.isEmpty()) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java index 6686d6794ce..23e677711e9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java @@ -27,11 +27,11 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; @@ -487,21 +487,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, } String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) { - AtomicReference coreNodeName = new AtomicReference<>(); try { - zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c -> { - String name = ClusterStateMutator.getAssignedCoreNodeName(c, msgNodeName, msgCore); - if (name == null) { - return false; - } - coreNodeName.set(name); - return true; - }); + DocCollection collection = zkStateReader.waitForState(collectionName, 320, TimeUnit.SECONDS, c -> + ClusterStateMutator.getAssignedCoreNodeName(c, msgNodeName, msgCore) != null + ); + return ClusterStateMutator.getAssignedCoreNodeName(collection, msgNodeName, msgCore); } catch (TimeoutException | InterruptedException e) { SolrZkClient.checkInterrupted(e); throw new SolrException(ErrorCode.SERVER_ERROR, "Failed waiting for coreNodeName", e); } - return coreNodeName.get(); } ClusterState waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException { @@ -609,26 +603,23 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler, Map waitToSeeReplicasInState(String collectionName, Collection coreNames) throws InterruptedException { assert coreNames.size() > 0; - Map results = new HashMap<>(); - AtomicReference lastState = new AtomicReference<>(); + Map results = new ConcurrentHashMap<>(); long maxWait = Long.getLong("solr.waitToSeeReplicasInStateTimeoutSeconds", 120); // could be a big cluster try { zkStateReader.waitForState(collectionName, maxWait, TimeUnit.SECONDS, c -> { if (c == null) return false; + // We write into a ConcurrentHashMap, which will be ok if called multiple times by multiple threads c.getSlices().stream().flatMap(slice -> slice.getReplicas().stream()) - .filter(r -> coreNames.contains(r.getCoreName())) // Only the elements that were asked for... - .filter(r -> !results.containsKey(r.getCoreName())) // ...but not the ones we've seen already... - .forEach(r -> results.put(r.getCoreName(), r)); // ...get added to the map + .filter(r -> coreNames.contains(r.getCoreName())) // Only the elements that were asked for... + .forEach(r -> results.putIfAbsent(r.getCoreName(), r)); // ...get added to the map - lastState.set(c); log.debug("Expecting {} cores, found {}", coreNames, results); return results.size() == coreNames.size(); }); } catch (TimeoutException e) { - String error = "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + lastState.get(); - throw new SolrException(ErrorCode.SERVER_ERROR, error); + throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e); } return results; diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index d3834caefb7..8aad1392b18 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -1698,16 +1698,18 @@ public class ZkStateReader implements SolrCloseable { *

* Note that the predicate may be called again even after it has returned true, so * implementors should avoid changing state within the predicate call itself. + * The predicate may also be called concurrently when multiple state changes are seen in rapid succession. *

* * @param collection the collection to watch * @param wait how long to wait * @param unit the units of the wait parameter * @param predicate the predicate to call on state changes + * @return the state of the doc collection after the predicate succeeds * @throws InterruptedException on interrupt * @throws TimeoutException on timeout */ - public void waitForState(final String collection, long wait, TimeUnit unit, Predicate predicate) + public DocCollection waitForState(final String collection, long wait, TimeUnit unit, Predicate predicate) throws InterruptedException, TimeoutException { if (log.isDebugEnabled()) { log.debug("Waiting up to {}ms for state {}", unit.toMillis(wait), predicate); @@ -1733,7 +1735,7 @@ public class ZkStateReader implements SolrCloseable { // wait for the watcher predicate to return true, or time out if (!latch.await(wait, unit)) throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get()); - + return docCollection.get(); } finally { removeDocCollectionWatcher(collection, watcher); waitLatches.remove(latch);