From 015e0fc1cf1d581c9657cd8f5588062c02588793 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Thu, 30 Jun 2016 15:22:57 +0530 Subject: [PATCH] SOLR-9264: Optimize ZkController.publishAndWaitForDownStates to not read all collection states and watch relevant collections instead --- solr/CHANGES.txt | 3 + .../org/apache/solr/cloud/ZkController.java | 66 ++++++++----------- 2 files changed, 29 insertions(+), 40 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 94b07a32a43..59f411f73c5 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -106,6 +106,9 @@ Optimizations * SOLR-9219: Make hdfs blockcache read buffer sizes configurable and improve cache concurrency. (Mark Miller) +* SOLR-9264: Optimize ZkController.publishAndWaitForDownStates to not read all collection states and + watch relevant collections instead. (Hrishikesh Gadre, shalin) + Other Changes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 102774ff66c..c4cd36c0c5d 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -37,11 +37,13 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Strings; import org.apache.commons.lang.StringUtils; @@ -52,25 +54,7 @@ import org.apache.solr.cloud.overseer.OverseerAction; import org.apache.solr.cloud.overseer.SliceMutator; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; -import org.apache.solr.common.cloud.BeforeReconnect; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.ClusterStateUtil; -import org.apache.solr.common.cloud.DefaultConnectionStrategy; -import org.apache.solr.common.cloud.DefaultZkACLProvider; -import org.apache.solr.common.cloud.DefaultZkCredentialsProvider; -import org.apache.solr.common.cloud.DocCollection; -import org.apache.solr.common.cloud.OnReconnect; -import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.Slice; -import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.common.cloud.ZkACLProvider; -import org.apache.solr.common.cloud.ZkCmdExecutor; -import org.apache.solr.common.cloud.ZkConfigManager; -import org.apache.solr.common.cloud.ZkCoreNodeProps; -import org.apache.solr.common.cloud.ZkCredentialsProvider; -import org.apache.solr.common.cloud.ZkNodeProps; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.cloud.ZooKeeperException; +import org.apache.solr.common.cloud.*; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.SolrParams; @@ -93,6 +77,7 @@ import org.apache.zookeeper.Op; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -746,35 +731,36 @@ public final class ZkController { InterruptedException { publishNodeAsDown(getNodeName()); - - // now wait till the updates are in our state - long now = System.nanoTime(); - long timeout = now + TimeUnit.NANOSECONDS.convert(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS); - while (System.nanoTime() < timeout) { - boolean foundStates = true; - ClusterState clusterState = zkStateReader.getClusterState(); - Map collections = clusterState.getCollectionsMap(); - for (Map.Entry entry : collections.entrySet()) { - DocCollection collection = entry.getValue(); - Collection slices = collection.getSlices(); - for (Slice slice : slices) { - Collection replicas = slice.getReplicas(); - for (Replica replica : replicas) { - if (getNodeName().equals(replica.getNodeName()) && replica.getState() != Replica.State.DOWN) { + Set collectionsWithLocalReplica = ConcurrentHashMap.newKeySet(); + for (SolrCore core : cc.getCores()) { + collectionsWithLocalReplica.add(core.getCoreDescriptor().getCloudDescriptor().getCollectionName()); + } + + CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size()); + for (String collectionWithLocalReplica : collectionsWithLocalReplica) { + zkStateReader.registerCollectionStateWatcher(collectionWithLocalReplica, (liveNodes, collectionState) -> { + boolean foundStates = true; + for (SolrCore core : cc.getCores()) { + if (core.getCoreDescriptor().getCloudDescriptor().getCollectionName().equals(collectionWithLocalReplica)) { + Replica replica = collectionState.getReplica(core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName()); + if (replica.getState() != Replica.State.DOWN) { foundStates = false; } } } - } - Thread.sleep(1000); - if (foundStates) { - return; - } + if (foundStates && collectionsWithLocalReplica.remove(collectionWithLocalReplica)) { + latch.countDown(); + } + return foundStates; + }); } - log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state."); + boolean allPublishedDown = latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!allPublishedDown) { + log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state."); + } } /**