From 2c1f110b6bf0053cfa50608a70454d9102744511 Mon Sep 17 00:00:00 2001 From: Tomas Fernandez Lobbe Date: Mon, 2 Apr 2018 15:56:25 -0700 Subject: [PATCH] SOLR-12172: Fixed race condition in collection properties --- solr/CHANGES.txt | 3 ++ .../solr/cloud/CollectionPropsTest.java | 19 +++++++- .../solr/common/cloud/SolrZkClient.java | 2 +- .../solr/common/cloud/ZkStateReader.java | 47 ++++++++++++------- 4 files changed, 50 insertions(+), 21 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 50fe77f6fc8..f99bd39ea17 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -105,6 +105,9 @@ Bug Fixes * SOLR-9399: Delete requests do not send credentials & fails for Basic Authentication (Susheel Kumar, Aibao Luo, Nikkolay Martinov via Erick Erickson) +* SOLR-12172: Fixed race condition that could cause an invalid set of collection properties to be kept in + memory when multiple collection property changes are done in a short period of time. (Tomás Fernández Löbbe) + Optimizations ---------------------- diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java index 01d8be77d75..5d9e4eea56a 100644 --- a/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java @@ -19,6 +19,7 @@ package org.apache.solr.cloud; import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -95,6 +96,7 @@ public class CollectionPropsTest extends SolrCloudTestCase { final CollectionPropsWatcher w = new CollectionPropsWatcher() { @Override public boolean onStateChanged(Map collectionProperties) { + log.info("collection properties changed. Now: {}", collectionProperties); return false; } }; @@ -134,17 +136,30 @@ public class CollectionPropsTest extends SolrCloudTestCase { private void waitForValue(String propertyName, String expectedValue, int timeout) throws InterruptedException { final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + Object lastValueSeen = null; for (int i = 0; i < timeout; i += 10) { final Object value = zkStateReader.getCollectionProperties(collectionName).get(propertyName); if ((expectedValue == null && value == null) || (expectedValue != null && expectedValue.equals(value))) { return; } + lastValueSeen = value; Thread.sleep(10); } + String collectionpropsInZk = null; + try { + collectionpropsInZk = new String(cluster.getZkClient().getData("/collections/" + collectionName + "/collectionprops.json", null, null, true), StandardCharsets.UTF_8); + } catch (Exception e) { + collectionpropsInZk = "Could not get file from ZooKeeper: " + e.getMessage(); + log.error("Could not get collectionprops from ZooKeeper for assertion mesage", e); + } + + String propertiesInZkReader = cluster.getSolrClient().getZkStateReader().getCollectionProperties(collectionName).toString(); - fail(String.format(Locale.ROOT, "Could not see value change after setting collection property. Name: %s, current value: %s, expected value: %s", - propertyName, zkStateReader.getCollectionProperties(collectionName).get(propertyName), expectedValue)); + fail(String.format(Locale.ROOT, "Could not see value change after setting collection property. Name: %s, current value: %s, expected value: %s. " + + "\ncollectionprops.json file in ZooKeeper: %s" + + "\nCollectionProperties in zkStateReader: %s", + propertyName, lastValueSeen, expectedValue, collectionpropsInZk, propertiesInZkReader)); } @Test diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java index bc12c4469a5..c646258f00e 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java @@ -261,7 +261,7 @@ public class SolrZkClient implements Closeable { return new SolrZkWatcher() { @Override public void process(final WatchedEvent event) { - log.debug("Submitting job to respond to event " + event); + log.debug("Submitting job to respond to event {}", event); try { if (watcher instanceof ConnectionManager) { zkConnManagerCallbackExecutor.submit(() -> watcher.process(event)); diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 76bf4e57714..b0b591a4721 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -16,6 +16,14 @@ */ package org.apache.solr.common.cloud; +import static java.util.Arrays.asList; +import static java.util.Collections.EMPTY_MAP; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.emptySortedSet; +import static java.util.Collections.unmodifiableSet; +import static org.apache.solr.common.util.Utils.fromJSON; + import java.io.Closeable; import java.lang.invoke.MethodHandles; import java.util.ArrayList; @@ -43,7 +51,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; import java.util.stream.Collectors; - import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig; import org.apache.solr.common.Callable; import org.apache.solr.common.SolrException; @@ -52,6 +59,7 @@ import org.apache.solr.common.params.AutoScalingParams; import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.Pair; +import org.apache.solr.common.util.SolrjNamedThreadFactory; import org.apache.solr.common.util.Utils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -61,14 +69,6 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.util.Arrays.asList; -import static java.util.Collections.EMPTY_MAP; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static java.util.Collections.emptySortedSet; -import static java.util.Collections.unmodifiableSet; -import static org.apache.solr.common.util.Utils.fromJSON; - public class ZkStateReader implements Closeable { public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -172,6 +172,9 @@ public class ZkStateReader implements Closeable { private ConcurrentHashMap> collectionPropsWatches = new ConcurrentHashMap<>(); private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches"); + + /** Used to submit notifications to Collection Properties watchers in order **/ + private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("collectionPropsNotifications")); private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS); @@ -760,6 +763,7 @@ public class ZkStateReader implements Closeable { public void close() { this.closed = true; notifications.shutdown(); + ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications); if (closeClient) { zkClient.close(); } @@ -975,8 +979,7 @@ public class ZkStateReader implements Closeable { final String znodePath = getCollectionPropsPath(collection); while (true) { try { - Stat stat = new Stat(); - byte[] data = zkClient.getData(znodePath, watcher, stat, true); + byte[] data = zkClient.getData(znodePath, watcher, null, true); return (Map) Utils.fromJSON(data); } catch (ClassCastException e) { throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e); @@ -1152,10 +1155,19 @@ public class ZkStateReader implements Closeable { */ void refreshAndWatch(boolean notifyWatchers) { try { - Map properties = fetchCollectionProperties(coll, this); - watchedCollectionProps.put(coll, properties); - if (notifyWatchers) { - notifyPropsWatchers(coll, properties); + synchronized (coll) { // We only have one PropsWatcher instance per collection, so it's fine to sync on coll + Map properties = fetchCollectionProperties(coll, this); + watchedCollectionProps.put(coll, properties); + /* + * Note that if two events were fired close to each other and the second one arrived first, we would read the collectionprops.json + * twice for the same data and notify watchers (in case of notifyWatchers==true) twice for the same data, however it's guaranteed + * that after processing both events, watchedCollectionProps will have the latest data written to ZooKeeper and that the watchers + * won't be called with out of order data + * + */ + if (notifyWatchers) { + notifyPropsWatchers(coll, properties); + } } } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) { LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage()); @@ -1724,9 +1736,8 @@ public class ZkStateReader implements Closeable { private void notifyPropsWatchers(String collection, Map properties) { try { - notifications.submit(new PropsNotification(collection, properties)); - } - catch (RejectedExecutionException e) { + collectionPropsNotifications.submit(new PropsNotification(collection, properties)); + } catch (RejectedExecutionException e) { if (!closed) { LOG.error("Couldn't run collection properties notifications for {}", collection, e); }