mirror of
https://github.com/apache/lucene.git
synced 2025-02-07 18:49:03 +00:00
SOLR-8908: Fixed OnReconnect listener management in ZkController to allow for de-registering listeners.
Here's what this commit includes: * Added the removeOnReconnectListener method to ZkController to allow OnReconnect listener implementations to de-register; avoids a memory leak * Updated ZkIndexSchemaReader to add a CloseHook to the SolrCore it supports to de-register as an OnReconnect listener * Added unit test to verify that after reloading and deleting a SolrCore in managed schema mode, the associated ZkIndexSchemaReader gets de-registered correctly
This commit is contained in:
parent
41da63eee3
commit
af8a962417
@ -590,6 +590,9 @@ Bug Fixes
|
||||
* SOLR-8870: AngularJS Query tab no longer URL-encodes the /select part of the request, fixing possible 404 issue
|
||||
when Solr is behind a proxy. Also, now supports old-style &qt param when handler not prefixed with "/" (janhoy)
|
||||
|
||||
* SOLR-8908: Fix to OnReconnect listener registration to allow listeners to deregister, such
|
||||
as when a core is reloaded or deleted to avoid a memory leak. (Timothy Potter)
|
||||
|
||||
======================= 5.5.0 =======================
|
||||
|
||||
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release
|
||||
|
@ -183,7 +183,8 @@ public final class ZkController {
|
||||
private boolean zkRunOnly = Boolean.getBoolean("zkRunOnly"); // expert
|
||||
|
||||
// keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred
|
||||
private List<OnReconnect> reconnectListeners = new ArrayList<OnReconnect>();
|
||||
// ref is held as a HashSet since we clone the set before notifying to avoid synchronizing too long
|
||||
private HashSet<OnReconnect> reconnectListeners = new HashSet<OnReconnect>();
|
||||
|
||||
private class RegisterCoreAsync implements Callable {
|
||||
|
||||
@ -204,6 +205,22 @@ public final class ZkController {
|
||||
}
|
||||
}
|
||||
|
||||
// notifies registered listeners after the ZK reconnect in the background
|
||||
private class OnReconnectNotifyAsync implements Callable {
|
||||
|
||||
private final OnReconnect listener;
|
||||
|
||||
OnReconnectNotifyAsync(OnReconnect listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
listener.command();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect)
|
||||
throws InterruptedException, TimeoutException, IOException {
|
||||
|
||||
@ -291,8 +308,8 @@ public final class ZkController {
|
||||
|
||||
List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
|
||||
// re register all descriptors
|
||||
ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
|
||||
if (descriptors != null) {
|
||||
ExecutorService executorService = (cc != null) ? cc.getCoreZkRegisterExecutorService() : null;
|
||||
for (CoreDescriptor descriptor : descriptors) {
|
||||
// TODO: we need to think carefully about what happens when it
|
||||
// was
|
||||
@ -315,17 +332,23 @@ public final class ZkController {
|
||||
}
|
||||
|
||||
// notify any other objects that need to know when the session was re-connected
|
||||
HashSet<OnReconnect> clonedListeners;
|
||||
synchronized (reconnectListeners) {
|
||||
for (OnReconnect listener : reconnectListeners) {
|
||||
try {
|
||||
clonedListeners = (HashSet<OnReconnect>)reconnectListeners.clone();
|
||||
}
|
||||
// the OnReconnect operation can be expensive per listener, so do that async in the background
|
||||
for (OnReconnect listener : clonedListeners) {
|
||||
try {
|
||||
if (executorService != null) {
|
||||
executorService.submit(new OnReconnectNotifyAsync(listener));
|
||||
} else {
|
||||
listener.command();
|
||||
} catch (Exception exc) {
|
||||
// not much we can do here other than warn in the log
|
||||
log.warn("Error when notifying OnReconnect listener " + listener + " after session re-connected.", exc);
|
||||
}
|
||||
} catch (Exception exc) {
|
||||
// not much we can do here other than warn in the log
|
||||
log.warn("Error when notifying OnReconnect listener " + listener + " after session re-connected.", exc);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
// Restore the interrupted status
|
||||
Thread.currentThread().interrupt();
|
||||
@ -2170,10 +2193,37 @@ public final class ZkController {
|
||||
if (listener != null) {
|
||||
synchronized (reconnectListeners) {
|
||||
reconnectListeners.add(listener);
|
||||
log.info("Added new OnReconnect listener "+listener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removed a previously registered OnReconnect listener, such as when a core is removed or reloaded.
|
||||
*/
|
||||
public void removeOnReconnectListener(OnReconnect listener) {
|
||||
if (listener != null) {
|
||||
boolean wasRemoved;
|
||||
synchronized (reconnectListeners) {
|
||||
wasRemoved = reconnectListeners.remove(listener);
|
||||
}
|
||||
if (wasRemoved) {
|
||||
log.info("Removed OnReconnect listener "+listener);
|
||||
} else {
|
||||
log.warn("Was asked to remove OnReconnect listener "+listener+
|
||||
", but remove operation did not find it in the list of registered listeners.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Set<OnReconnect> getCurrentOnReconnectListeners() {
|
||||
HashSet<OnReconnect> clonedListeners;
|
||||
synchronized (reconnectListeners) {
|
||||
clonedListeners = (HashSet<OnReconnect>)reconnectListeners.clone();
|
||||
}
|
||||
return clonedListeners;
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists a config file to ZooKeeper using optimistic concurrency.
|
||||
*
|
||||
|
@ -30,6 +30,8 @@ import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZkCmdExecutor;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.core.CloseHook;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrConfig;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.core.SolrResourceLoader;
|
||||
@ -373,7 +375,7 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
|
||||
public void inform(SolrCore core) {
|
||||
this.core = core;
|
||||
if (loader instanceof ZkSolrResourceLoader) {
|
||||
this.zkIndexSchemaReader = new ZkIndexSchemaReader(this);
|
||||
this.zkIndexSchemaReader = new ZkIndexSchemaReader(this, core);
|
||||
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)loader;
|
||||
zkLoader.setZkIndexSchemaReader(this.zkIndexSchemaReader);
|
||||
} else {
|
||||
|
@ -20,6 +20,9 @@ import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.OnReconnect;
|
||||
import org.apache.solr.common.cloud.SolrZkClient;
|
||||
import org.apache.solr.common.cloud.ZooKeeperException;
|
||||
import org.apache.solr.core.CloseHook;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
@ -38,13 +41,34 @@ public class ZkIndexSchemaReader implements OnReconnect {
|
||||
private final ManagedIndexSchemaFactory managedIndexSchemaFactory;
|
||||
private SolrZkClient zkClient;
|
||||
private String managedSchemaPath;
|
||||
private final String uniqueCoreId; // used in equals impl to uniquely identify the core that we're dependent on
|
||||
private boolean isRemoved = false;
|
||||
|
||||
public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory) {
|
||||
public ZkIndexSchemaReader(ManagedIndexSchemaFactory managedIndexSchemaFactory, SolrCore solrCore) {
|
||||
this.managedIndexSchemaFactory = managedIndexSchemaFactory;
|
||||
ZkSolrResourceLoader zkLoader = (ZkSolrResourceLoader)managedIndexSchemaFactory.getResourceLoader();
|
||||
this.zkClient = zkLoader.getZkController().getZkClient();
|
||||
managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName();
|
||||
this.managedSchemaPath = zkLoader.getConfigSetZkPath() + "/" + managedIndexSchemaFactory.getManagedSchemaResourceName();
|
||||
this.uniqueCoreId = solrCore.getName()+":"+solrCore.getStartNanoTime();
|
||||
|
||||
// register a CloseHook for the core this reader is linked to, so that we can de-register the listener
|
||||
solrCore.addCloseHook(new CloseHook() {
|
||||
@Override
|
||||
public void preClose(SolrCore core) {
|
||||
CoreContainer cc = core.getCoreDescriptor().getCoreContainer();
|
||||
if (cc.isZooKeeperAware()) {
|
||||
log.info("Removing ZkIndexSchemaReader OnReconnect listener as core "+core.getName()+" is shutting down.");
|
||||
ZkIndexSchemaReader.this.isRemoved = true;
|
||||
cc.getZkController().removeOnReconnectListener(ZkIndexSchemaReader.this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postClose(SolrCore core) {}
|
||||
});
|
||||
|
||||
createSchemaWatcher();
|
||||
|
||||
zkLoader.getZkController().addOnReconnectListener(this);
|
||||
}
|
||||
|
||||
@ -59,6 +83,11 @@ public class ZkIndexSchemaReader implements OnReconnect {
|
||||
zkClient.exists(managedSchemaPath, new Watcher() {
|
||||
@Override
|
||||
public void process(WatchedEvent event) {
|
||||
|
||||
if (ZkIndexSchemaReader.this.isRemoved) {
|
||||
return; // the core for this reader has already been removed, don't process this event
|
||||
}
|
||||
|
||||
// session events are not change events, and do not remove the watcher
|
||||
if (Event.EventType.None.equals(event.getType())) {
|
||||
return;
|
||||
@ -135,4 +164,27 @@ public class ZkIndexSchemaReader implements OnReconnect {
|
||||
log.error("Failed to update managed-schema watcher after session expiration due to: "+exc, exc);
|
||||
}
|
||||
}
|
||||
|
||||
public String getUniqueCoreId() {
|
||||
return uniqueCoreId;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "ZkIndexSchemaReader: "+managedSchemaPath+", uniqueCoreId: "+uniqueCoreId;
|
||||
}
|
||||
|
||||
public int hashCode() {
|
||||
return managedSchemaPath.hashCode()+uniqueCoreId.hashCode();
|
||||
}
|
||||
|
||||
// We need the uniqueCoreId which is core name + start time nanos to be the tie breaker
|
||||
// as there can be multiple ZkIndexSchemaReader instances active for the same core after
|
||||
// a reload (one is initializing and the other is being shutdown)
|
||||
public boolean equals(Object other) {
|
||||
if (other == null) return false;
|
||||
if (other == this) return true;
|
||||
if (!(other instanceof ZkIndexSchemaReader)) return false;
|
||||
ZkIndexSchemaReader that = (ZkIndexSchemaReader)other;
|
||||
return this.managedSchemaPath.equals(that.managedSchemaPath) && this.uniqueCoreId.equals(that.uniqueCoreId);
|
||||
}
|
||||
}
|
||||
|
@ -16,26 +16,16 @@
|
||||
*/
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.CoreAdminResponse;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkStateReader;
|
||||
import org.apache.solr.common.params.CollectionParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -65,32 +55,13 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
|
||||
createCollectionRetry(testCollectionName, 1, 1, 1);
|
||||
cloudClient.setDefaultCollection(testCollectionName);
|
||||
|
||||
Replica leader = null;
|
||||
String replicaState = null;
|
||||
int timeoutSecs = 30;
|
||||
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
|
||||
while (System.nanoTime() < timeout) {
|
||||
Replica tmp = null;
|
||||
try {
|
||||
tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
|
||||
} catch (Exception exc) {}
|
||||
if (tmp != null && "active".equals(tmp.getStr(ZkStateReader.STATE_PROP))) {
|
||||
leader = tmp;
|
||||
replicaState = "active";
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
assertNotNull("Could not find active leader for " + shardId + " of " +
|
||||
testCollectionName + " after "+timeoutSecs+" secs; clusterState: " +
|
||||
printClusterStateInfo(testCollectionName), leader);
|
||||
Replica leader = getShardLeader(testCollectionName, shardId, 30 /* timeout secs */);
|
||||
|
||||
// reload collection and wait to see the core report it has been reloaded
|
||||
boolean wasReloaded = reloadCollection(leader, testCollectionName);
|
||||
assertTrue("Collection '"+testCollectionName+"' failed to reload within a reasonable amount of time!",
|
||||
wasReloaded);
|
||||
|
||||
|
||||
// cause session loss
|
||||
chaosMonkey.expireSession(getJettyOnPort(getReplicaPort(leader)));
|
||||
|
||||
@ -99,8 +70,9 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
|
||||
Thread.sleep(15000);
|
||||
|
||||
// wait up to 15 seconds to see the replica in the active state
|
||||
timeoutSecs = 15;
|
||||
timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
|
||||
String replicaState = null;
|
||||
int timeoutSecs = 15;
|
||||
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
|
||||
while (System.nanoTime() < timeout) {
|
||||
// state of leader should be active after session loss recovery - see SOLR-7338
|
||||
cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
|
||||
@ -126,53 +98,4 @@ public class CollectionReloadTest extends AbstractFullDistribZkTestBase {
|
||||
|
||||
log.info("testReloadedLeaderStateAfterZkSessionLoss succeeded ... shutting down now!");
|
||||
}
|
||||
|
||||
protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception {
|
||||
ZkCoreNodeProps coreProps = new ZkCoreNodeProps(replica);
|
||||
String coreName = coreProps.getCoreName();
|
||||
boolean reloadedOk = false;
|
||||
try (HttpSolrClient client = getHttpSolrClient(coreProps.getBaseUrl())) {
|
||||
CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client);
|
||||
long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime();
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
// send reload command for the collection
|
||||
log.info("Sending RELOAD command for "+testCollectionName);
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionParams.CollectionAction.RELOAD.toString());
|
||||
params.set("name", testCollectionName);
|
||||
QueryRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
client.request(request);
|
||||
Thread.sleep(2000); // reload can take a short while
|
||||
|
||||
// verify reload is done, waiting up to 30 seconds for slow test environments
|
||||
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
|
||||
while (System.nanoTime() < timeout) {
|
||||
statusResp = CoreAdminRequest.getStatus(coreName, client);
|
||||
long startTimeAfterReload = statusResp.getStartTime(coreName).getTime();
|
||||
if (startTimeAfterReload > leaderCoreStartTime) {
|
||||
reloadedOk = true;
|
||||
break;
|
||||
}
|
||||
// else ... still waiting to see the reloaded core report a later start time
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
return reloadedOk;
|
||||
}
|
||||
|
||||
private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
|
||||
throws SolrServerException, IOException {
|
||||
CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
|
||||
if (resp.getResponse().get("failure") != null) {
|
||||
CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
|
||||
req.setCollectionName(testCollectionName);
|
||||
req.process(cloudClient);
|
||||
resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
|
||||
if (resp.getResponse().get("failure") != null)
|
||||
fail("Could not create " + testCollectionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -291,22 +291,6 @@ public class DistributedVersionInfoTest extends AbstractFullDistribZkTestBase {
|
||||
return vers.longValue();
|
||||
}
|
||||
|
||||
private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
|
||||
throws SolrServerException, IOException {
|
||||
CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
|
||||
if (resp.getResponse().get("failure") != null) {
|
||||
CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
|
||||
req.setCollectionName(testCollectionName);
|
||||
req.process(cloudClient);
|
||||
|
||||
resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
|
||||
|
||||
if (resp.getResponse().get("failure") != null) {
|
||||
fail("Could not create " + testCollectionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
|
||||
String testCollectionName,
|
||||
int firstDocId,
|
||||
|
@ -428,22 +428,6 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
private void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
|
||||
throws SolrServerException, IOException {
|
||||
CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
|
||||
if (resp.getResponse().get("failure") != null) {
|
||||
CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
|
||||
req.setCollectionName(testCollectionName);
|
||||
req.process(cloudClient);
|
||||
|
||||
resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
|
||||
|
||||
if (resp.getResponse().get("failure") != null) {
|
||||
fail("Could not create " + testCollectionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// test inspired by SOLR-6511
|
||||
protected void testLeaderZkSessionLoss() throws Exception {
|
||||
|
||||
|
@ -0,0 +1,155 @@
|
||||
package org.apache.solr.cloud;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
|
||||
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.common.cloud.OnReconnect;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.core.CoreContainer;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.schema.ZkIndexSchemaReader;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
|
||||
|
||||
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
|
||||
public class TestOnReconnectListenerSupport extends AbstractFullDistribZkTestBase {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
|
||||
|
||||
public TestOnReconnectListenerSupport() {
|
||||
super();
|
||||
sliceCount = 2;
|
||||
fixShardCount(3);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void initSysProperties() {
|
||||
System.setProperty("managed.schema.mutable", "false");
|
||||
System.setProperty("enable.update.log", "true");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getCloudSolrConfig() {
|
||||
return "solrconfig-managed-schema.xml";
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
waitForThingsToLevelOut(30000);
|
||||
|
||||
String testCollectionName = "c8n_onreconnect_1x1";
|
||||
String shardId = "shard1";
|
||||
createCollectionRetry(testCollectionName, 1, 1, 1);
|
||||
cloudClient.setDefaultCollection(testCollectionName);
|
||||
|
||||
Replica leader = getShardLeader(testCollectionName, shardId, 30 /* timeout secs */);
|
||||
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
|
||||
|
||||
// get the ZkController for the node hosting the leader
|
||||
CoreContainer cores = leaderJetty.getCoreContainer();
|
||||
ZkController zkController = cores.getZkController();
|
||||
assertNotNull("ZkController is null", zkController);
|
||||
|
||||
String leaderCoreName = leader.getStr(CORE_NAME_PROP);
|
||||
String leaderCoreId;
|
||||
try (SolrCore leaderCore = cores.getCore(leaderCoreName)) {
|
||||
assertNotNull("SolrCore for "+leaderCoreName+" not found!", leaderCore);
|
||||
leaderCoreId = leaderCore.getName()+":"+leaderCore.getStartNanoTime();
|
||||
}
|
||||
|
||||
// verify the ZkIndexSchemaReader is a registered OnReconnect listener
|
||||
Set<OnReconnect> listeners = zkController.getCurrentOnReconnectListeners();
|
||||
assertNotNull("ZkController returned null OnReconnect listeners", listeners);
|
||||
ZkIndexSchemaReader expectedListener = null;
|
||||
for (OnReconnect listener : listeners) {
|
||||
if (listener instanceof ZkIndexSchemaReader) {
|
||||
ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
|
||||
if (leaderCoreId.equals(reader.getUniqueCoreId())) {
|
||||
expectedListener = reader;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
assertNotNull("ZkIndexSchemaReader for core " + leaderCoreName +
|
||||
" not registered as an OnReconnect listener and should be", expectedListener);
|
||||
|
||||
// reload the collection
|
||||
boolean wasReloaded = reloadCollection(leader, testCollectionName);
|
||||
assertTrue("Collection '" + testCollectionName + "' failed to reload within a reasonable amount of time!",
|
||||
wasReloaded);
|
||||
|
||||
// after reload, the new core should be registered as an OnReconnect listener and the old should not be
|
||||
String reloadedLeaderCoreId;
|
||||
try (SolrCore leaderCore = cores.getCore(leaderCoreName)) {
|
||||
reloadedLeaderCoreId = leaderCore.getName()+":"+leaderCore.getStartNanoTime();
|
||||
}
|
||||
|
||||
// they shouldn't be equal after reload
|
||||
assertTrue(!leaderCoreId.equals(reloadedLeaderCoreId));
|
||||
|
||||
listeners = zkController.getCurrentOnReconnectListeners();
|
||||
assertNotNull("ZkController returned null OnReconnect listeners", listeners);
|
||||
|
||||
expectedListener = null; // reset
|
||||
for (OnReconnect listener : listeners) {
|
||||
if (listener instanceof ZkIndexSchemaReader) {
|
||||
ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
|
||||
if (leaderCoreId.equals(reader.getUniqueCoreId())) {
|
||||
fail("Previous core "+leaderCoreId+
|
||||
" should no longer be a registered OnReconnect listener! Current listeners: "+listeners);
|
||||
} else if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) {
|
||||
expectedListener = reader;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertNotNull("ZkIndexSchemaReader for core "+reloadedLeaderCoreId+
|
||||
" not registered as an OnReconnect listener and should be", expectedListener);
|
||||
|
||||
// try to clean up
|
||||
try {
|
||||
CollectionAdminRequest.deleteCollection(testCollectionName).process(cloudClient);
|
||||
} catch (Exception e) {
|
||||
// don't fail the test
|
||||
log.warn("Could not delete collection {} after test completed", testCollectionName);
|
||||
}
|
||||
|
||||
listeners = zkController.getCurrentOnReconnectListeners();
|
||||
for (OnReconnect listener : listeners) {
|
||||
if (listener instanceof ZkIndexSchemaReader) {
|
||||
ZkIndexSchemaReader reader = (ZkIndexSchemaReader)listener;
|
||||
if (reloadedLeaderCoreId.equals(reader.getUniqueCoreId())) {
|
||||
fail("Previous core "+reloadedLeaderCoreId+
|
||||
" should no longer be a registered OnReconnect listener after collection delete!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("TestOnReconnectListenerSupport succeeded ... shutting down now!");
|
||||
}
|
||||
}
|
@ -16,6 +16,13 @@
|
||||
*/
|
||||
package org.apache.solr.common.cloud;
|
||||
|
||||
/**
|
||||
* Implementations are expected to implement a correct hashCode and equals
|
||||
* method needed to uniquely identify the listener as listeners are managed
|
||||
* in a Set. In addition, your listener implementation should call
|
||||
* org.apache.solr.cloud.ZkController#removeOnReconnectListener(OnReconnect)
|
||||
* when it no longer needs to be notified of ZK reconnection events.
|
||||
*/
|
||||
public interface OnReconnect {
|
||||
public void command();
|
||||
void command();
|
||||
}
|
||||
|
@ -46,9 +46,11 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.CoreAdminResponse;
|
||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||
import org.apache.solr.client.solrj.response.RequestStatusState;
|
||||
import org.apache.solr.common.SolrDocument;
|
||||
@ -1808,6 +1810,43 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
createCollection(collectionInfos, collName, props, client);
|
||||
}
|
||||
|
||||
protected void createCollectionRetry(String testCollectionName, int numShards, int replicationFactor, int maxShardsPerNode)
|
||||
throws SolrServerException, IOException {
|
||||
CollectionAdminResponse resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
|
||||
if (resp.getResponse().get("failure") != null) {
|
||||
CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
|
||||
req.setCollectionName(testCollectionName);
|
||||
req.process(cloudClient);
|
||||
|
||||
resp = createCollection(testCollectionName, numShards, replicationFactor, maxShardsPerNode);
|
||||
|
||||
if (resp.getResponse().get("failure") != null) {
|
||||
fail("Could not create " + testCollectionName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Replica getShardLeader(String testCollectionName, String shardId, int timeoutSecs) throws Exception {
|
||||
Replica leader = null;
|
||||
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutSecs, TimeUnit.SECONDS);
|
||||
while (System.nanoTime() < timeout) {
|
||||
Replica tmp = null;
|
||||
try {
|
||||
tmp = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, shardId);
|
||||
} catch (Exception exc) {}
|
||||
if (tmp != null && "active".equals(tmp.getStr(ZkStateReader.STATE_PROP))) {
|
||||
leader = tmp;
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
assertNotNull("Could not find active leader for " + shardId + " of " +
|
||||
testCollectionName + " after "+timeoutSecs+" secs; clusterState: " +
|
||||
printClusterStateInfo(testCollectionName), leader);
|
||||
|
||||
return leader;
|
||||
}
|
||||
|
||||
protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception {
|
||||
final RTimer timer = new RTimer();
|
||||
|
||||
@ -1891,6 +1930,42 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
|
||||
return cs;
|
||||
}
|
||||
|
||||
protected boolean reloadCollection(Replica replica, String testCollectionName) throws Exception {
|
||||
ZkCoreNodeProps coreProps = new ZkCoreNodeProps(replica);
|
||||
String coreName = coreProps.getCoreName();
|
||||
boolean reloadedOk = false;
|
||||
try (HttpSolrClient client = getHttpSolrClient(coreProps.getBaseUrl())) {
|
||||
CoreAdminResponse statusResp = CoreAdminRequest.getStatus(coreName, client);
|
||||
long leaderCoreStartTime = statusResp.getStartTime(coreName).getTime();
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
// send reload command for the collection
|
||||
log.info("Sending RELOAD command for "+testCollectionName);
|
||||
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||
params.set("action", CollectionParams.CollectionAction.RELOAD.toString());
|
||||
params.set("name", testCollectionName);
|
||||
QueryRequest request = new QueryRequest(params);
|
||||
request.setPath("/admin/collections");
|
||||
client.request(request);
|
||||
Thread.sleep(2000); // reload can take a short while
|
||||
|
||||
// verify reload is done, waiting up to 30 seconds for slow test environments
|
||||
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
|
||||
while (System.nanoTime() < timeout) {
|
||||
statusResp = CoreAdminRequest.getStatus(coreName, client);
|
||||
long startTimeAfterReload = statusResp.getStartTime(coreName).getTime();
|
||||
if (startTimeAfterReload > leaderCoreStartTime) {
|
||||
reloadedOk = true;
|
||||
break;
|
||||
}
|
||||
// else ... still waiting to see the reloaded core report a later start time
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
return reloadedOk;
|
||||
}
|
||||
|
||||
static RequestStatusState getRequestStateAfterCompletion(String requestId, int waitForSeconds, SolrClient client)
|
||||
throws IOException, SolrServerException {
|
||||
RequestStatusState state = null;
|
||||
|
Loading…
x
Reference in New Issue
Block a user