SOLR-9405: ConcurrentModificationException in ZkStateReader.getStateWatchers

This commit is contained in:
Shalin Shekhar Mangar 2016-08-12 17:33:32 +05:30
parent 92b5a76b54
commit f82c3b1206
3 changed files with 49 additions and 19 deletions

View File

@ -203,6 +203,9 @@ Bug Fixes
* SOLR-9397: Config API does not support adding caches (noble) * SOLR-9397: Config API does not support adding caches (noble)
* SOLR-9405: ConcurrentModificationException in ZkStateReader.getStateWatchers.
(Alan Woodward, Edward Ribeiro, shalin)
Optimizations Optimizations
---------------------- ----------------------

View File

@ -147,7 +147,7 @@ public class ZkStateReader implements Closeable {
private class CollectionWatch { private class CollectionWatch {
int coreRefCount = 0; int coreRefCount = 0;
Set<CollectionStateWatcher> stateWatchers = new HashSet<>(); Set<CollectionStateWatcher> stateWatchers = ConcurrentHashMap.newKeySet();
public boolean canBeRemoved() { public boolean canBeRemoved() {
return coreRefCount + stateWatchers.size() == 0; return coreRefCount + stateWatchers.size() == 0;
@ -1273,10 +1273,14 @@ public class ZkStateReader implements Closeable {
/* package-private for testing */ /* package-private for testing */
Set<CollectionStateWatcher> getStateWatchers(String collection) { Set<CollectionStateWatcher> getStateWatchers(String collection) {
CollectionWatch watch = collectionWatches.get(collection); final Set<CollectionStateWatcher> watchers = new HashSet<>();
if (watch == null) collectionWatches.compute(collection, (k, v) -> {
return null; if (v != null) {
return new HashSet<>(watch.stateWatchers); watchers.addAll(v.stateWatchers);
}
return v;
});
return watchers;
} }
// returns true if the state has changed // returns true if the state has changed

View File

@ -19,8 +19,9 @@ package org.apache.solr.common.cloud;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -81,6 +82,31 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
}); });
} }
private static void waitFor(String message, long timeout, TimeUnit unit, Callable<Boolean> predicate)
throws InterruptedException, ExecutionException {
Future<Boolean> future = executor.submit(() -> {
try {
while (true) {
if (predicate.call())
return true;
TimeUnit.MILLISECONDS.sleep(10);
}
}
catch (InterruptedException e) {
return false;
}
});
try {
if (future.get(timeout, unit) == true) {
return;
}
}
catch (TimeoutException e) {
// pass failure message on
}
future.cancel(true);
fail(message);
}
@Test @Test
public void testSimpleCollectionWatch() throws Exception { public void testSimpleCollectionWatch() throws Exception {
@ -113,9 +139,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size())); cluster.stopJettySolrRunner(random().nextInt(cluster.getJettySolrRunners().size()));
assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); assertTrue("CollectionStateWatcher was never notified of cluster change", latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
Set<CollectionStateWatcher> watchers = client.getZkStateReader().getStateWatchers("testcollection"); waitFor("CollectionStateWatcher wasn't cleared after completion", 1, TimeUnit.SECONDS,
assertTrue("CollectionStateWatcher wasn't cleared after completion", () -> client.getZkStateReader().getStateWatchers("testcollection").isEmpty());
watchers == null || watchers.size() == 0);
} }
@ -144,8 +169,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection", assertTrue("CollectionStateWatcher isn't called when registering for already-watched collection",
latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)); latch.await(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS));
assertEquals("CollectionStateWatcher should be removed", waitFor("CollectionStateWatcher should be removed", 1, TimeUnit.SECONDS,
1, client.getZkStateReader().getStateWatchers("currentstate").size()); () -> client.getZkStateReader().getStateWatchers("currentstate").size() == 1);
} }
@Test @Test
@ -189,9 +214,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
expectThrows(TimeoutException.class, () -> { expectThrows(TimeoutException.class, () -> {
client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false)); client.waitForState("nosuchcollection", 1, TimeUnit.SECONDS, ((liveNodes, collectionState) -> false));
}); });
Set<CollectionStateWatcher> watchers = client.getZkStateReader().getStateWatchers("nosuchcollection"); waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
assertTrue("Watchers for collection should be removed after timeout", () -> client.getZkStateReader().getStateWatchers("nosuchcollection").isEmpty());
watchers == null || watchers.size() == 0);
} }
@ -229,18 +253,17 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
} }
@Test @Test
public void testWatcherIsRemovedAfterTimeout() { public void testWatcherIsRemovedAfterTimeout() throws Exception {
CloudSolrClient client = cluster.getSolrClient(); CloudSolrClient client = cluster.getSolrClient();
assertTrue("There should be no watchers for a non-existent collection!", assertTrue("There should be no watchers for a non-existent collection!",
client.getZkStateReader().getStateWatchers("no-such-collection") == null); client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
expectThrows(TimeoutException.class, () -> { expectThrows(TimeoutException.class, () -> {
client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); client.waitForState("no-such-collection", 10, TimeUnit.MILLISECONDS, (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
}); });
Set<CollectionStateWatcher> watchers = client.getZkStateReader().getStateWatchers("no-such-collection"); waitFor("Watchers for collection should be removed after timeout", 1, TimeUnit.SECONDS,
assertTrue("Watchers for collection should be removed after timeout", () -> client.getZkStateReader().getStateWatchers("no-such-collection").isEmpty());
watchers == null || watchers.size() == 0);
} }