mirror of https://github.com/apache/lucene.git
SOLR-13491 - SolrZkClient's watch wrapper no longer allows zookeeper
to hold the same watch object multiple times.
This commit is contained in:
parent
19fe85a3e9
commit
d0c1c36c91
|
@ -87,7 +87,7 @@ New Features
|
|||
|
||||
* SOLR-11558: It would be nice if the Graph section of the Cloud tab in the Admin UI could give some more
|
||||
information about the replicas of a collection (Erick Erickson)
|
||||
|
||||
|
||||
* SOLR-13468: autoscaling/suggestions should be able to give suggestions from config sent as a payload (noble)
|
||||
|
||||
* SOLR-12304: The MoreLikeThisComponent now supports the mlt.interestingTerms parameter. Previously this option was
|
||||
|
@ -108,6 +108,8 @@ Bug Fixes
|
|||
|
||||
* SOLR-12941: Fix IndexSizeTrigger to correctly work with "aboveBytes" and "splitMethod=link" parameters. (ab)
|
||||
|
||||
* SOLR-13491: SolrZkClient's watch wrapper no longer allows zookeeper to hold the same watch object multiple times.
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -16,6 +16,12 @@
|
|||
*/
|
||||
package org.apache.solr.common.cloud;
|
||||
|
||||
import javax.xml.transform.OutputKeys;
|
||||
import javax.xml.transform.Source;
|
||||
import javax.xml.transform.Transformer;
|
||||
import javax.xml.transform.TransformerFactory;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -32,13 +38,6 @@ import java.util.function.Function;
|
|||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.xml.transform.OutputKeys;
|
||||
import javax.xml.transform.Source;
|
||||
import javax.xml.transform.Transformer;
|
||||
import javax.xml.transform.TransformerFactory;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.common.SolrException;
|
||||
|
@ -274,29 +273,8 @@ public class SolrZkClient implements Closeable {
|
|||
* {@link #getData(String, org.apache.zookeeper.Watcher, org.apache.zookeeper.data.Stat, boolean)}.
|
||||
*/
|
||||
public Watcher wrapWatcher(final Watcher watcher) {
|
||||
if (watcher == null || watcher instanceof SolrZkWatcher) return watcher;
|
||||
|
||||
return new SolrZkWatcher() {
|
||||
@Override
|
||||
public void process(final WatchedEvent event) {
|
||||
log.debug("Submitting job to respond to event {}", event);
|
||||
try {
|
||||
if (watcher instanceof ConnectionManager) {
|
||||
zkConnManagerCallbackExecutor.submit(() -> watcher.process(event));
|
||||
} else {
|
||||
zkCallbackExecutor.submit(() -> watcher.process(event));
|
||||
}
|
||||
} catch (RejectedExecutionException e) {
|
||||
// If not a graceful shutdown
|
||||
if (!isClosed()) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private interface SolrZkWatcher extends Watcher {
|
||||
if (watcher == null || watcher instanceof ProcessWatchWithExecutor) return watcher;
|
||||
return new ProcessWatchWithExecutor(watcher);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -834,4 +812,56 @@ public class SolrZkClient implements Closeable {
|
|||
public void downloadFromZK(String zkPath, Path dir) throws IOException {
|
||||
ZkMaintenanceUtils.downloadFromZK(this, zkPath, dir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Watcher wrapper that ensures that heavy implementations of process do not interfere with our ability
|
||||
* to react to other watches, but also ensures that two wrappers containing equal watches are considered
|
||||
* equal (and thus we won't accumulate multiple wrappers of the same watch).
|
||||
*/
|
||||
private final class ProcessWatchWithExecutor implements Watcher { // see below for why final.
|
||||
private final Watcher watcher;
|
||||
|
||||
ProcessWatchWithExecutor(Watcher watcher) {
|
||||
if (watcher == null) {
|
||||
throw new IllegalArgumentException("Watcher must not be null");
|
||||
}
|
||||
this.watcher = watcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final WatchedEvent event) {
|
||||
log.debug("Submitting job to respond to event {}", event);
|
||||
try {
|
||||
if (watcher instanceof ConnectionManager) {
|
||||
zkConnManagerCallbackExecutor.submit(() -> watcher.process(event));
|
||||
} else {
|
||||
zkCallbackExecutor.submit(() -> watcher.process(event));
|
||||
}
|
||||
} catch (RejectedExecutionException e) {
|
||||
// If not a graceful shutdown
|
||||
if (!isClosed()) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// These overrides of hashcode/equals ensure that we don't store the same exact watch
|
||||
// multiple times in org.apache.zookeeper.ZooKeeper.ZKWatchManager.dataWatches
|
||||
// (a Map<String<Set<Watch>>). This class is marked final to avoid oddball
|
||||
// cases with sub-classes, if you need different behavior, find a new class or make
|
||||
// sure you account for the case where two diff sub-classes with different behavior
|
||||
// for process(WatchEvent) and have been created with the same watch object.
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return watcher.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof ProcessWatchWithExecutor) {
|
||||
return this.watcher.equals(((ProcessWatchWithExecutor) obj).watcher);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,17 +16,24 @@
|
|||
*/
|
||||
package org.apache.solr.common.cloud;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.cloud.AbstractZkTestCase;
|
||||
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||
import org.apache.solr.cloud.ZkTestServer;
|
||||
import org.apache.solr.util.ExternalPaths;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
import org.apache.zookeeper.data.Id;
|
||||
|
@ -35,23 +42,28 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class SolrZkClientTest extends SolrTestCaseJ4 {
|
||||
public class SolrZkClientTest extends SolrCloudTestCase {
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
|
||||
private static final String ROOT = "/";
|
||||
private static final String PATH = "/collections/collection1";
|
||||
|
||||
|
||||
protected ZkTestServer zkServer;
|
||||
|
||||
SolrZkClient aclClient;
|
||||
SolrZkClient credentialsClient;
|
||||
SolrZkClient defaultClient;
|
||||
|
||||
private CloudSolrClient solrClient;
|
||||
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
configureCluster(1)
|
||||
.addConfig("_default", new File(ExternalPaths.DEFAULT_CONFIGSET).toPath())
|
||||
.configure();
|
||||
solrClient = getCloudSolrClient(cluster.getZkServer().getZkAddress());
|
||||
|
||||
final String SCHEME = "digest";
|
||||
final String AUTH = "user:pass";
|
||||
|
||||
|
@ -59,7 +71,7 @@ public class SolrZkClientTest extends SolrTestCaseJ4 {
|
|||
log.info("ZooKeeper dataDir:" + zkDir);
|
||||
zkServer = new ZkTestServer(zkDir);
|
||||
zkServer.run();
|
||||
|
||||
|
||||
try (SolrZkClient client = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) {
|
||||
// Set up chroot
|
||||
client.makePath("/solr", false, true);
|
||||
|
@ -67,7 +79,7 @@ public class SolrZkClientTest extends SolrTestCaseJ4 {
|
|||
|
||||
defaultClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT);
|
||||
defaultClient.makePath(PATH, true);
|
||||
|
||||
|
||||
aclClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT) {
|
||||
@Override
|
||||
protected ZkACLProvider createZkACLProvider() {
|
||||
|
@ -84,7 +96,7 @@ public class SolrZkClientTest extends SolrTestCaseJ4 {
|
|||
};
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
credentialsClient = new SolrZkClient(zkServer.getZkAddress(), AbstractZkTestCase.TIMEOUT) {
|
||||
@Override
|
||||
protected ZkCredentialsProvider createZkCredentialsToAddAutomatically() {
|
||||
|
@ -97,17 +109,19 @@ public class SolrZkClientTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
aclClient.close();
|
||||
credentialsClient.close();
|
||||
defaultClient.close();
|
||||
zkServer.shutdown();
|
||||
solrClient.close();
|
||||
cluster.shutdown();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
|
||||
public void testSimpleUpdateACLs() throws KeeperException, InterruptedException {
|
||||
|
@ -131,7 +145,56 @@ public class SolrZkClientTest extends SolrTestCaseJ4 {
|
|||
assertTrue("Default client should read unaffected paths", canRead(defaultClient, ROOT));
|
||||
assertFalse("Default client should not read secure children", canRead(defaultClient, PATH));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
// SOLR-13491
|
||||
public void testWrappingWatches() throws Exception {
|
||||
AtomicInteger calls = new AtomicInteger(0);
|
||||
Watcher watcherA = new Watcher() {
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
calls.getAndIncrement();
|
||||
}
|
||||
};
|
||||
Watcher watcherB = new Watcher() {
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
calls.getAndDecrement();
|
||||
}
|
||||
};
|
||||
Watcher wrapped1A = defaultClient.wrapWatcher(watcherA);
|
||||
Watcher wrapped2A = defaultClient.wrapWatcher(watcherA);
|
||||
Watcher wrappedB = defaultClient.wrapWatcher(watcherB);
|
||||
assertTrue(wrapped1A.equals(wrapped2A));
|
||||
assertTrue(wrapped2A.equals(wrapped1A));
|
||||
assertFalse(wrapped1A.equals(wrappedB));
|
||||
assertEquals(wrapped1A.hashCode(), wrapped2A.hashCode());
|
||||
|
||||
CollectionAdminRequest.createCollection(getSaferTestName(), "_default", 1, 1)
|
||||
.setMaxShardsPerNode(2)
|
||||
.process(solrClient);
|
||||
|
||||
CollectionAdminRequest.setCollectionProperty(getSaferTestName(),"foo", "bar")
|
||||
.process(solrClient);
|
||||
|
||||
//Thread.sleep(600000);
|
||||
|
||||
solrClient.getZkStateReader().getZkClient().getData("/collections/" + getSaferTestName() + "/collectionprops.json",wrapped1A, null,true);
|
||||
solrClient.getZkStateReader().getZkClient().getData("/collections/" + getSaferTestName() + "/collectionprops.json",wrapped2A, null,true);
|
||||
|
||||
CollectionAdminRequest.setCollectionProperty(getSaferTestName(),"baz", "bam")
|
||||
.process(solrClient);
|
||||
|
||||
assertEquals(1, calls.get()); // same wrapped watch set twice, only invoked once
|
||||
solrClient.getZkStateReader().getZkClient().getData("/collections/" + getSaferTestName() + "/collectionprops.json",wrapped1A, null,true);
|
||||
solrClient.getZkStateReader().getZkClient().getData("/collections/" + getSaferTestName() + "/collectionprops.json",wrappedB, null,true);
|
||||
|
||||
CollectionAdminRequest.setCollectionProperty(getSaferTestName(),"baz", "bang")
|
||||
.process(solrClient);
|
||||
|
||||
assertEquals(1, calls.get()); // offsetting watches, no change
|
||||
}
|
||||
|
||||
private static boolean canRead(SolrZkClient zkClient, String path) throws KeeperException, InterruptedException {
|
||||
try {
|
||||
zkClient.getData(path, null, null, true);
|
||||
|
@ -140,7 +203,7 @@ public class SolrZkClientTest extends SolrTestCaseJ4 {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCheckInterrupted() {
|
||||
assertFalse(Thread.currentThread().isInterrupted());
|
||||
|
@ -149,6 +212,6 @@ public class SolrZkClientTest extends SolrTestCaseJ4 {
|
|||
SolrZkClient.checkInterrupted(new InterruptedException());
|
||||
assertTrue(Thread.currentThread().isInterrupted());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue