SOLR-9264: Optimize ZkController.publishAndWaitForDownStates to not read all collection states and watch relevant collections instead

This commit is contained in:
Shalin Shekhar Mangar 2016-06-30 15:22:57 +05:30
parent 1dc7480bcd
commit 015e0fc1cf
2 changed files with 29 additions and 40 deletions

View File

@ -106,6 +106,9 @@ Optimizations
* SOLR-9219: Make hdfs blockcache read buffer sizes configurable and improve cache concurrency. (Mark Miller) * SOLR-9219: Make hdfs blockcache read buffer sizes configurable and improve cache concurrency. (Mark Miller)
* SOLR-9264: Optimize ZkController.publishAndWaitForDownStates to not read all collection states and
watch relevant collections instead. (Hrishikesh Gadre, shalin)
Other Changes Other Changes
---------------------- ----------------------

View File

@ -37,11 +37,13 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -52,25 +54,7 @@ import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator; import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect; import org.apache.solr.common.cloud.*;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ClusterStateUtil;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.OnReconnect;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkACLProvider;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkConfigManager;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkCredentialsProvider;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.SolrParams;
@ -93,6 +77,7 @@ import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.MDC; import org.slf4j.MDC;
@ -747,35 +732,36 @@ public final class ZkController {
publishNodeAsDown(getNodeName()); publishNodeAsDown(getNodeName());
// now wait till the updates are in our state Set<String> collectionsWithLocalReplica = ConcurrentHashMap.newKeySet();
long now = System.nanoTime(); for (SolrCore core : cc.getCores()) {
long timeout = now + TimeUnit.NANOSECONDS.convert(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS); collectionsWithLocalReplica.add(core.getCoreDescriptor().getCloudDescriptor().getCollectionName());
}
while (System.nanoTime() < timeout) { CountDownLatch latch = new CountDownLatch(collectionsWithLocalReplica.size());
for (String collectionWithLocalReplica : collectionsWithLocalReplica) {
zkStateReader.registerCollectionStateWatcher(collectionWithLocalReplica, (liveNodes, collectionState) -> {
boolean foundStates = true; boolean foundStates = true;
ClusterState clusterState = zkStateReader.getClusterState(); for (SolrCore core : cc.getCores()) {
Map<String, DocCollection> collections = clusterState.getCollectionsMap(); if (core.getCoreDescriptor().getCloudDescriptor().getCollectionName().equals(collectionWithLocalReplica)) {
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) { Replica replica = collectionState.getReplica(core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
DocCollection collection = entry.getValue(); if (replica.getState() != Replica.State.DOWN) {
Collection<Slice> slices = collection.getSlices();
for (Slice slice : slices) {
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
if (getNodeName().equals(replica.getNodeName()) && replica.getState() != Replica.State.DOWN) {
foundStates = false; foundStates = false;
} }
} }
} }
if (foundStates && collectionsWithLocalReplica.remove(collectionWithLocalReplica)) {
latch.countDown();
}
return foundStates;
});
} }
Thread.sleep(1000); boolean allPublishedDown = latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (foundStates) { if (!allPublishedDown) {
return;
}
}
log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state."); log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
} }
}
/** /**
* Validates if the chroot exists in zk (or if it is successfully created). * Validates if the chroot exists in zk (or if it is successfully created).