mirror of https://github.com/apache/lucene.git
SOLR-9405: ConcurrentModificationException in ZkStateReader.getStateWatchers
(cherry picked from commit f82c3b1
)
This commit is contained in:
parent
0252790962
commit
42254c2d9b
|
@ -168,6 +168,9 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-9397: Config API does not support adding caches (noble)
|
* SOLR-9397: Config API does not support adding caches (noble)
|
||||||
|
|
||||||
|
* SOLR-9405: ConcurrentModificationException in ZkStateReader.getStateWatchers.
|
||||||
|
(Alan Woodward, Edward Ribeiro, shalin)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -147,7 +147,7 @@ public class ZkStateReader implements Closeable {
|
||||||
private class CollectionWatch {
|
private class CollectionWatch {
|
||||||
|
|
||||||
int coreRefCount = 0;
|
int coreRefCount = 0;
|
||||||
Set<CollectionStateWatcher> stateWatchers = new HashSet<>();
|
Set<CollectionStateWatcher> stateWatchers = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
public boolean canBeRemoved() {
|
public boolean canBeRemoved() {
|
||||||
return coreRefCount + stateWatchers.size() == 0;
|
return coreRefCount + stateWatchers.size() == 0;
|
||||||
|
@ -1273,10 +1273,14 @@ public class ZkStateReader implements Closeable {
|
||||||
|
|
||||||
/* package-private for testing */
|
/* package-private for testing */
|
||||||
Set<CollectionStateWatcher> getStateWatchers(String collection) {
|
Set<CollectionStateWatcher> getStateWatchers(String collection) {
|
||||||
CollectionWatch watch = collectionWatches.get(collection);
|
final Set<CollectionStateWatcher> watchers = new HashSet<>();
|
||||||
if (watch == null)
|
collectionWatches.compute(collection, (k, v) -> {
|
||||||
return null;
|
if (v != null) {
|
||||||
return new HashSet<>(watch.stateWatchers);
|
watchers.addAll(v.stateWatchers);
|
||||||
|
}
|
||||||
|
return v;
|
||||||
|
});
|
||||||
|
return watchers;
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns true if the state has changed
|
// returns true if the state has changed
|
||||||
|
|
|
@ -19,8 +19,9 @@ package org.apache.solr.common.cloud;
|
||||||
|
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Set;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -81,6 +82,31 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
Future<Boolean> future = executor.submit(() -> {
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
if (predicate.call())
|
||||||
|
return true;
|
||||||
|
TimeUnit.MILLISECONDS.sleep(10);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
if (future.get(timeout, unit) == true) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (TimeoutException e) {
|
||||||
|
// pass failure message on
|
||||||
|
}
|
||||||
|
future.cancel(true);
|
||||||
|
fail(message);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleCollectionWatch() throws Exception {
|
public void testSimpleCollectionWatch() throws Exception {
|
||||||
|
@ -113,9 +139,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
|
||||||
cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
|
cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
|
||||||
assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
|
assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
|
||||||
|
|
||||||
Set<CollectionStateWatcher> watchers = client.getZkStateReader().getStateWatchers("testcollection");
|
waitFor("CollectionStateWatcher wasn't cleared after completion", 1, TimeUnit.SECONDS,
|
||||||
assertTrue("CollectionStateWatcher wasn't cleared after completion",
|
() -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty());
|
||||||
watchers == null || watchers.size() == 0);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,8 +169,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
|
||||||
|
|
||||||
assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection",
|
assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection",
|
||||||
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
|
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
|
||||||
assertEquals("CollectionStateWatcher should be removed",
|
waitFor("CollectionStateWatcher should be removed", 1, TimeUnit.SECONDS,
|
||||||
1, client.getZkStateReader().getStateWatchers("currentstate").size());
|
() -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -189,9 +214,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
|
||||||
expectThrows(TimeoutException.class, () -> {
|
expectThrows(TimeoutException.class, () -> {
|
||||||
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false));
|
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false));
|
||||||
});
|
});
|
||||||
Set<CollectionStateWatcher> watchers = client.getZkStateReader().getStateWatchers("nosuchcollection");
|
waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
|
||||||
assertTrue("Watchers for collection should be removed after timeout",
|
() -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
|
||||||
watchers == null || watchers.size() == 0);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,18 +253,17 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWatcherIsRemovedAfterTimeout() {
|
public void testWatcherIsRemovedAfterTimeout() throws Exception {
|
||||||
CloudSolrClient client = cluster.getSolrClient();
|
CloudSolrClient client = cluster.getSolrClient();
|
||||||
assertTrue("There should be no watchers for a non-existent collection!",
|
assertTrue("There should be no watchers for a non-existent collection!",
|
||||||
client.getZkStateReader().getStateWatchers("no-such-collection") == null);
|
client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
|
||||||
|
|
||||||
expectThrows(TimeoutException.class, () -> {
|
expectThrows(TimeoutException.class, () -> {
|
||||||
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
|
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
|
||||||
});
|
});
|
||||||
|
|
||||||
Set<CollectionStateWatcher> watchers = client.getZkStateReader().getStateWatchers("no-such-collection");
|
waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
|
||||||
assertTrue("Watchers for collection should be removed after timeout",
|
() -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
|
||||||
watchers == null || watchers.size() == 0);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue