diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 518f63a10d9..b2a384b2ce4 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -203,6 +203,9 @@ Bug Fixes * SOLR-9397: Config API does not support adding caches (noble) +* SOLR-9405: ConcurrentModificationException in ZkStateReader.getStateWatchers. + (Alan Woodward, Edward Ribeiro, shalin) + Optimizations ---------------------- diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 9df4a76aa8e..a3de324e0a7 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -147,7 +147,7 @@ public class ZkStateReader implements Closeable { private class CollectionWatch { int coreRefCount = 0; - Set stateWatchers = new HashSet<>(); + Set stateWatchers = ConcurrentHashMap.newKeySet(); public boolean canBeRemoved() { return coreRefCount + stateWatchers.size() == 0; @@ -1273,10 +1273,14 @@ public class ZkStateReader implements Closeable { /* package-private for testing */ Set getStateWatchers(String collection) { - CollectionWatch watch = collectionWatches.get(collection); - if (watch == null) - return null; - return new HashSet<>(watch.stateWatchers); + final Set watchers = new HashSet<>(); + collectionWatches.compute(collection, (k, v) -> { + if (v != null) { + watchers.addAll(v.stateWatchers); + } + return v; + }); + return watchers; } // returns true if the state has changed diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java index d959aa83c9b..fca0e35728c 100644 --- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java +++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java @@ -19,8 +19,9 @@ package org.apache.solr.common.cloud; import java.lang.invoke.MethodHandles; import java.util.HashMap; -import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -81,6 +82,31 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { }); } + private static void waitFor(String message, long timeout, TimeUnit unit, Callable predicate) + throws InterruptedException, ExecutionException { + Future future = executor.submit(() -> { + try { + while (true) { + if (predicate.call()) + return true; + TimeUnit.MILLISECONDS.sleep(10); + } + } + catch (InterruptedException e) { + return false; + } + }); + try { + if (future.get(timeout, unit) == true) { + return; + } + } + catch (TimeoutException e) { + // pass failure message on + } + future.cancel(true); + fail(message); + } @Test public void testSimpleCollectionWatch() throws Exception { @@ -113,9 +139,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); - Set watchers = client.getZkStateReader().getStateWatchers("testcollection"); - assertTrue("CollectionStateWatcher wasn't cleared after completion", - watchers == null || watchers.size() == 0); + waitFor("CollectionStateWatcher wasn't cleared after completion", 1, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty()); } @@ -144,8 +169,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); - assertEquals("CollectionStateWatcher should be removed", - 1, client.getZkStateReader().getStateWatchers("currentstate").size()); + waitFor("CollectionStateWatcher should be removed", 1, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1); } @Test @@ -189,9 +214,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { expectThrows(TimeoutException.class, () -> { client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false)); }); - Set watchers = client.getZkStateReader().getStateWatchers("nosuchcollection"); - assertTrue("Watchers for collection should be removed after timeout", - watchers == null || watchers.size() == 0); + waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty()); } @@ -229,18 +253,17 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase { } @Test - public void testWatcherIsRemovedAfterTimeout() { + public void testWatcherIsRemovedAfterTimeout() throws Exception { CloudSolrClient client = cluster.getSolrClient(); assertTrue("There should be no watchers for a non-existent collection!", - client.getZkStateReader().getStateWatchers("no-such-collection") == null); + client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); expectThrows(TimeoutException.class, () -> { client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); }); - Set watchers = client.getZkStateReader().getStateWatchers("no-such-collection"); - assertTrue("Watchers for collection should be removed after timeout", - watchers == null || watchers.size() == 0); + waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS, + () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty()); }