mirror of https://github.com/apache/lucene.git
SOLR-12172: Fixed race condition in collection properties
This commit is contained in:
parent
41a1cbe2c3
commit
2c1f110b6b
|
@ -105,6 +105,9 @@ Bug Fixes
|
||||||
* SOLR-9399: Delete requests do not send credentials & fails for Basic Authentication
|
* SOLR-9399: Delete requests do not send credentials & fails for Basic Authentication
|
||||||
(Susheel Kumar, Aibao Luo, Nikkolay Martinov via Erick Erickson)
|
(Susheel Kumar, Aibao Luo, Nikkolay Martinov via Erick Erickson)
|
||||||
|
|
||||||
|
* SOLR-12172: Fixed race condition that could cause an invalid set of collection properties to be kept in
|
||||||
|
memory when multiple collection property changes are done in a short period of time. (Tomás Fernández Löbbe)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.solr.cloud;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -95,6 +96,7 @@ public class CollectionPropsTest extends SolrCloudTestCase {
|
||||||
final CollectionPropsWatcher w = new CollectionPropsWatcher() {
|
final CollectionPropsWatcher w = new CollectionPropsWatcher() {
|
||||||
@Override
|
@Override
|
||||||
public boolean onStateChanged(Map<String,String> collectionProperties) {
|
public boolean onStateChanged(Map<String,String> collectionProperties) {
|
||||||
|
log.info("collection properties changed. Now: {}", collectionProperties);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -134,17 +136,30 @@ public class CollectionPropsTest extends SolrCloudTestCase {
|
||||||
private void waitForValue(String propertyName, String expectedValue, int timeout) throws InterruptedException {
|
private void waitForValue(String propertyName, String expectedValue, int timeout) throws InterruptedException {
|
||||||
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
final ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
|
||||||
|
|
||||||
|
Object lastValueSeen = null;
|
||||||
for (int i = 0; i < timeout; i += 10) {
|
for (int i = 0; i < timeout; i += 10) {
|
||||||
final Object value = zkStateReader.getCollectionProperties(collectionName).get(propertyName);
|
final Object value = zkStateReader.getCollectionProperties(collectionName).get(propertyName);
|
||||||
if ((expectedValue == null && value == null) ||
|
if ((expectedValue == null && value == null) ||
|
||||||
(expectedValue != null && expectedValue.equals(value))) {
|
(expectedValue != null && expectedValue.equals(value))) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
lastValueSeen = value;
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
}
|
}
|
||||||
|
String collectionpropsInZk = null;
|
||||||
|
try {
|
||||||
|
collectionpropsInZk = new String(cluster.getZkClient().getData("/collections/" + collectionName + "/collectionprops.json", null, null, true), StandardCharsets.UTF_8);
|
||||||
|
} catch (Exception e) {
|
||||||
|
collectionpropsInZk = "Could not get file from ZooKeeper: " + e.getMessage();
|
||||||
|
log.error("Could not get collectionprops from ZooKeeper for assertion mesage", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
String propertiesInZkReader = cluster.getSolrClient().getZkStateReader().getCollectionProperties(collectionName).toString();
|
||||||
|
|
||||||
fail(String.format(Locale.ROOT, "Could not see value change after setting collection property. Name: %s, current value: %s, expected value: %s",
|
fail(String.format(Locale.ROOT, "Could not see value change after setting collection property. Name: %s, current value: %s, expected value: %s. " +
|
||||||
propertyName, zkStateReader.getCollectionProperties(collectionName).get(propertyName), expectedValue));
|
"\ncollectionprops.json file in ZooKeeper: %s" +
|
||||||
|
"\nCollectionProperties in zkStateReader: %s",
|
||||||
|
propertyName, lastValueSeen, expectedValue, collectionpropsInZk, propertiesInZkReader));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -261,7 +261,7 @@ public class SolrZkClient implements Closeable {
|
||||||
return new SolrZkWatcher() {
|
return new SolrZkWatcher() {
|
||||||
@Override
|
@Override
|
||||||
public void process(final WatchedEvent event) {
|
public void process(final WatchedEvent event) {
|
||||||
log.debug("Submitting job to respond to event " + event);
|
log.debug("Submitting job to respond to event {}", event);
|
||||||
try {
|
try {
|
||||||
if (watcher instanceof ConnectionManager) {
|
if (watcher instanceof ConnectionManager) {
|
||||||
zkConnManagerCallbackExecutor.submit(() -> watcher.process(event));
|
zkConnManagerCallbackExecutor.submit(() -> watcher.process(event));
|
||||||
|
|
|
@ -16,6 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.common.cloud;
|
package org.apache.solr.common.cloud;
|
||||||
|
|
||||||
|
import static java.util.Arrays.asList;
|
||||||
|
import static java.util.Collections.EMPTY_MAP;
|
||||||
|
import static java.util.Collections.emptyMap;
|
||||||
|
import static java.util.Collections.emptySet;
|
||||||
|
import static java.util.Collections.emptySortedSet;
|
||||||
|
import static java.util.Collections.unmodifiableSet;
|
||||||
|
import static org.apache.solr.common.util.Utils.fromJSON;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -43,7 +51,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.UnaryOperator;
|
import java.util.function.UnaryOperator;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
|
||||||
import org.apache.solr.common.Callable;
|
import org.apache.solr.common.Callable;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
|
@ -52,6 +59,7 @@ import org.apache.solr.common.params.AutoScalingParams;
|
||||||
import org.apache.solr.common.params.CoreAdminParams;
|
import org.apache.solr.common.params.CoreAdminParams;
|
||||||
import org.apache.solr.common.util.ExecutorUtil;
|
import org.apache.solr.common.util.ExecutorUtil;
|
||||||
import org.apache.solr.common.util.Pair;
|
import org.apache.solr.common.util.Pair;
|
||||||
|
import org.apache.solr.common.util.SolrjNamedThreadFactory;
|
||||||
import org.apache.solr.common.util.Utils;
|
import org.apache.solr.common.util.Utils;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
@ -61,14 +69,6 @@ import org.apache.zookeeper.data.Stat;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
|
||||||
import static java.util.Collections.EMPTY_MAP;
|
|
||||||
import static java.util.Collections.emptyMap;
|
|
||||||
import static java.util.Collections.emptySet;
|
|
||||||
import static java.util.Collections.emptySortedSet;
|
|
||||||
import static java.util.Collections.unmodifiableSet;
|
|
||||||
import static org.apache.solr.common.util.Utils.fromJSON;
|
|
||||||
|
|
||||||
public class ZkStateReader implements Closeable {
|
public class ZkStateReader implements Closeable {
|
||||||
public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates
|
public static final int STATE_UPDATE_DELAY = Integer.getInteger("solr.OverseerStateUpdateDelay", 2000); // delay between cloud state updates
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||||
|
@ -172,6 +172,9 @@ public class ZkStateReader implements Closeable {
|
||||||
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsWatches = new ConcurrentHashMap<>();
|
private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsWatches = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
|
private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
|
||||||
|
|
||||||
|
/** Used to submit notifications to Collection Properties watchers in order **/
|
||||||
|
private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("collectionPropsNotifications"));
|
||||||
|
|
||||||
private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
|
private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
@ -760,6 +763,7 @@ public class ZkStateReader implements Closeable {
|
||||||
public void close() {
|
public void close() {
|
||||||
this.closed = true;
|
this.closed = true;
|
||||||
notifications.shutdown();
|
notifications.shutdown();
|
||||||
|
ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
|
||||||
if (closeClient) {
|
if (closeClient) {
|
||||||
zkClient.close();
|
zkClient.close();
|
||||||
}
|
}
|
||||||
|
@ -975,8 +979,7 @@ public class ZkStateReader implements Closeable {
|
||||||
final String znodePath = getCollectionPropsPath(collection);
|
final String znodePath = getCollectionPropsPath(collection);
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
Stat stat = new Stat();
|
byte[] data = zkClient.getData(znodePath, watcher, null, true);
|
||||||
byte[] data = zkClient.getData(znodePath, watcher, stat, true);
|
|
||||||
return (Map<String, String>) Utils.fromJSON(data);
|
return (Map<String, String>) Utils.fromJSON(data);
|
||||||
} catch (ClassCastException e) {
|
} catch (ClassCastException e) {
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
|
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to parse collection properties for collection " + collection, e);
|
||||||
|
@ -1152,10 +1155,19 @@ public class ZkStateReader implements Closeable {
|
||||||
*/
|
*/
|
||||||
void refreshAndWatch(boolean notifyWatchers) {
|
void refreshAndWatch(boolean notifyWatchers) {
|
||||||
try {
|
try {
|
||||||
Map<String, String> properties = fetchCollectionProperties(coll, this);
|
synchronized (coll) { // We only have one PropsWatcher instance per collection, so it's fine to sync on coll
|
||||||
watchedCollectionProps.put(coll, properties);
|
Map<String, String> properties = fetchCollectionProperties(coll, this);
|
||||||
if (notifyWatchers) {
|
watchedCollectionProps.put(coll, properties);
|
||||||
notifyPropsWatchers(coll, properties);
|
/*
|
||||||
|
* Note that if two events were fired close to each other and the second one arrived first, we would read the collectionprops.json
|
||||||
|
* twice for the same data and notify watchers (in case of notifyWatchers==true) twice for the same data, however it's guaranteed
|
||||||
|
* that after processing both events, watchedCollectionProps will have the latest data written to ZooKeeper and that the watchers
|
||||||
|
* won't be called with out of order data
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
if (notifyWatchers) {
|
||||||
|
notifyPropsWatchers(coll, properties);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
|
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
|
||||||
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
|
LOG.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
|
||||||
|
@ -1724,9 +1736,8 @@ public class ZkStateReader implements Closeable {
|
||||||
|
|
||||||
private void notifyPropsWatchers(String collection, Map<String, String> properties) {
|
private void notifyPropsWatchers(String collection, Map<String, String> properties) {
|
||||||
try {
|
try {
|
||||||
notifications.submit(new PropsNotification(collection, properties));
|
collectionPropsNotifications.submit(new PropsNotification(collection, properties));
|
||||||
}
|
} catch (RejectedExecutionException e) {
|
||||||
catch (RejectedExecutionException e) {
|
|
||||||
if (!closed) {
|
if (!closed) {
|
||||||
LOG.error("Couldn't run collection properties notifications for {}", collection, e);
|
LOG.error("Couldn't run collection properties notifications for {}", collection, e);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue