diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 6f778eaeea7..74331b0aa14 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -252,9 +252,6 @@ Optimizations * SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum) -* SOLR-9140: Replace zk polling in ZkController with CollectionStateWatchers - (Alan Woodward) - Other Changes ---------------------- * SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy. diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 64fa54b2a93..b36e7666513 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -27,7 +27,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -683,23 +682,35 @@ public final class ZkController { InterruptedException { publishNodeAsDown(getNodeName()); + + // now wait till the updates are in our state + long now = System.nanoTime(); + long timeout = now + TimeUnit.NANOSECONDS.convert(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS); + boolean foundStates = true; - Set collections = cc.getLocalCollections(); - CountDownLatch latch = new CountDownLatch(collections.size()); - - for (String collection : collections) { - zkStateReader.registerCollectionStateWatcher(collection, (nodes, state) -> { - for (Replica replica : state.getReplicasOnNode(getNodeName())) { - if (replica.getState() != Replica.State.DOWN) - return false; + while (System.nanoTime() < timeout) { + ClusterState clusterState = zkStateReader.getClusterState(); + Map collections = clusterState.getCollectionsMap(); + for (Map.Entry entry : collections.entrySet()) { + DocCollection collection = entry.getValue(); + Collection slices = collection.getSlices(); + for (Slice slice : slices) { + Collection replicas = slice.getReplicas(); + for (Replica replica : replicas) { + if (getNodeName().equals(replica.getNodeName()) && replica.getState() != Replica.State.DOWN) { + foundStates = false; + } + } } - latch.countDown(); - return true; - }); - } + } - if (latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS) == false) { - // TODO should we abort here? + if (foundStates) { + Thread.sleep(1000); + break; + } + Thread.sleep(1000); + } + if (!foundStates) { log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state."); } @@ -1355,7 +1366,7 @@ public final class ZkController { return zkStateReader; } - private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) throws InterruptedException { + private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) { final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName(); if (coreNodeName != null) { @@ -1367,45 +1378,58 @@ public final class ZkController { } } - private void waitForCoreNodeName(CoreDescriptor descriptor) throws InterruptedException { - log.info("Waiting for coreNodeName for core {} in collection {} to be assigned", - descriptor.getName(), descriptor.getCollectionName()); - final String thisNode = getNodeName(); - try { - zkStateReader.waitForState(descriptor.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> { - if (c == null) - return false; - for (Replica replica : c.getReplicasOnNode(thisNode)) { - if (descriptor.getName().equals(replica.getCoreName())) { - descriptor.getCloudDescriptor().setCoreNodeName(replica.getName()); - return true; + private void waitForCoreNodeName(CoreDescriptor descriptor) { + int retryCount = 320; + log.info("look for our core node name"); + while (retryCount-- > 0) { + Map slicesMap = zkStateReader.getClusterState() + .getSlicesMap(descriptor.getCloudDescriptor().getCollectionName()); + if (slicesMap != null) { + + for (Slice slice : slicesMap.values()) { + for (Replica replica : slice.getReplicas()) { + // TODO: for really large clusters, we could 'index' on this + + String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP); + String core = replica.getStr(ZkStateReader.CORE_NAME_PROP); + + String msgNodeName = getNodeName(); + String msgCore = descriptor.getName(); + + if (msgNodeName.equals(nodeName) && core.equals(msgCore)) { + descriptor.getCloudDescriptor() + .setCoreNodeName(replica.getName()); + return; + } } } - return false; - }); - } catch (TimeoutException e) { - throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting coreNodeName for " + descriptor.getName()); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } - private void waitForShardId(CoreDescriptor cd) throws InterruptedException { + private void waitForShardId(CoreDescriptor cd) { log.info("waiting to find shard id in clusterstate for " + cd.getName()); - final String thisNode = getNodeName(); - try { - zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> { - if (c == null) - return false; - String shardId = c.getShardId(thisNode, cd.getName()); - if (shardId != null) { - cd.getCloudDescriptor().setShardId(shardId); - return true; - } - return false; - }); - } - catch (TimeoutException e) { - throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting shard id for core: " + cd.getName()); + int retryCount = 320; + while (retryCount-- > 0) { + final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName()); + if (shardId != null) { + cd.getCloudDescriptor().setShardId(shardId); + return; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } + + throw new SolrException(ErrorCode.SERVER_ERROR, + "Could not get shard id for core: " + cd.getName()); } @@ -1419,7 +1443,7 @@ public final class ZkController { return coreNodeName; } - public void preRegister(CoreDescriptor cd) throws InterruptedException { + public void preRegister(CoreDescriptor cd) { String coreNodeName = getCoreNodeName(cd); diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index f291baecdce..b55cc550bde 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -26,12 +26,11 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Properties; -import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -42,7 +41,6 @@ import org.apache.solr.cloud.Overseer; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; -import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.IOUtils; @@ -822,7 +820,6 @@ public class CoreContainer { return core; } catch (Exception e) { - SolrZkClient.checkInterrupted(e); coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e)); log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e); final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e); @@ -872,17 +869,6 @@ public class CoreContainer { } - /** - * @return a Set containing the names of all collections with a core hosted in this container - */ - public Set getLocalCollections() { - Set collections = getCoreDescriptors().stream() - .filter(cd -> cd.getCollectionName() != null) - .map(CoreDescriptor::getCollectionName) - .collect(Collectors.toSet()); - return collections; - } - /** * Returns an immutable Map of Exceptions that occured when initializing * SolrCores (either at startup, or do to runtime requests to create cores) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 23074630662..452c7a15584 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -1104,7 +1104,7 @@ public abstract class CollectionAdminRequest deleteAsyncId(requestId).process(client); return state; } - TimeUnit.MILLISECONDS.sleep(100); + TimeUnit.SECONDS.sleep(1); } return state; } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index 9848e65d041..5504a8b1ab2 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; @@ -260,26 +259,4 @@ public class DocCollection extends ZkNodeProps implements Iterable { } return replicas; } - - /** - * Get all the replicas on a particular node - */ - public List getReplicasOnNode(String nodeName) { - return getReplicas().stream() - .filter(replica -> replica.getNodeName().equals(nodeName)) - .collect(Collectors.toList()); - } - - /** - * Get the shardId of a core on a specific node - */ - public String getShardId(String nodeName, String coreName) { - for (Slice slice : this) { - for (Replica replica : slice) { - if (Objects.equals(replica.getNodeName(), nodeName) && Objects.equals(replica.getCoreName(), coreName)) - return slice.getName(); - } - } - return null; - } } diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java index 8716dbecedf..2b2e181b68a 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -51,9 +51,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { @BeforeClass public static void startCluster() throws Exception { configureCluster(CLUSTER_SIZE) - .addConfig("config", getFile("solrj/solr/configsets/streaming/conf").toPath()) + .addConfig("config", getFile("solrj/solr/collection1/conf").toPath()) .configure(); - cluster.getSolrClient().connect(); } @AfterClass @@ -260,7 +259,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { final CloudSolrClient client = cluster.getSolrClient(); - Future future = waitInBackground("stateformat1", 30, TimeUnit.SECONDS, + Future future = waitInBackground("stateformat1", 10, TimeUnit.SECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)