SOLR-14940: Fix ReplicationHandler memory leak through SolrCore.closeHooks

* Added ability to remove SolrCore.closeHooks
* Keep references to CloseHooks in ReplicationHandler and remove them on ReplicationHandler.shutdown()

closes #1997
This commit is contained in:
Anver Sotnikov 2020-10-16 14:15:53 -04:00 committed by Mike Drob
parent 3730719ff2
commit 6d00843d97
6 changed files with 88 additions and 45 deletions

View File

@ -160,10 +160,12 @@ Optimizations
Bug Fixes
---------------------
(No changes)
* SOLR-14940: ReplicationHandler memory leak through SolrCore.closeHooks with unstable ZK connection. (Anver Sotnikov, Mike Drob)
Other Changes
---------------------
* SOLR-14954: Heavily edit reindexing.adoc (Sameul García Martínez and Erick Erickson)
================== 8.7.0 ==================

View File

@ -52,10 +52,18 @@ public class ReplicateFromLeader {
/**
* Start a replication handler thread that will periodically pull indices from the shard leader
*
* <p>This is separate from the ReplicationHandler that listens at /replication, used for recovery
* and leader actions. It is simpler to discard the entire polling ReplicationHandler rather then
* worrying about disabling polling and correctly setting all of the leader bits if we need to reset.
*
* <p>TODO: It may be cleaner to extract the polling logic use that directly instead of creating
* what might be a fairly heavyweight instance here.
*
* @param switchTransactionLog if true, ReplicationHandler will rotate the transaction log once
* the replication is done
*/
public void startReplication(boolean switchTransactionLog) throws InterruptedException {
public void startReplication(boolean switchTransactionLog) {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
if (cc.isShutDown()) {

View File

@ -1302,7 +1302,7 @@ public class ZkController implements Closeable {
return replica;
}
public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) {
log.info("{} starting background replication from leader", coreName);
ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
synchronized (replicateFromLeader) { // synchronize to prevent any stop before we finish the start

View File

@ -1732,6 +1732,20 @@ public final class SolrCore implements SolrInfoBean, Closeable {
closeHooks.add(hook);
}
/**
* Remove a close callback hook
*/
public void removeCloseHook(CloseHook hook) {
if (closeHooks != null) {
closeHooks.remove(hook);
}
}
// Visible for testing
public Collection<CloseHook> getCloseHooks() {
return Collections.unmodifiableCollection(closeHooks);
}
/**
* @lucene.internal Debugging aid only. No non-test code should be released with uncommented verbose() calls.
*/

View File

@ -1372,55 +1372,55 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return Boolean.TRUE.equals( enable );
}
private final CloseHook startShutdownHook = new CloseHook() {
@Override
public void preClose(SolrCore core) {
if (executorService != null)
executorService.shutdown(); // we don't wait for shutdown - this can deadlock core reload
}
@Override
public void postClose(SolrCore core) {
if (pollingIndexFetcher != null) {
pollingIndexFetcher.destroy();
}
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
currentIndexFetcher.destroy();
}
}
};
private final CloseHook finishShutdownHook = new CloseHook() {
@Override
public void preClose(SolrCore core) {
ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
if (restoreFuture != null) {
restoreFuture.cancel(false);
}
}
@Override
public void postClose(SolrCore core) {
}
};
/**
* register a closehook
*/
private void registerCloseHook() {
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
if (executorService != null) executorService.shutdown(); // we don't wait for shutdown - this can deadlock core reload
}
@Override
public void postClose(SolrCore core) {
if (pollingIndexFetcher != null) {
pollingIndexFetcher.destroy();
}
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
currentIndexFetcher.destroy();
}
}
});
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
if (restoreFuture != null) {
restoreFuture.cancel(false);
}
}
@Override
public void postClose(SolrCore core) {}
});
core.addCloseHook(startShutdownHook);
core.addCloseHook(finishShutdownHook);
}
public void shutdown() {
if (executorService != null) executorService.shutdown();
if (pollingIndexFetcher != null) {
pollingIndexFetcher.destroy();
}
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
currentIndexFetcher.destroy();
}
ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
if (restoreFuture != null) {
restoreFuture.cancel(false);
}
startShutdownHook.preClose(core);
startShutdownHook.postClose(core);
finishShutdownHook.preClose(core);
finishShutdownHook.postClose(core);
ExecutorUtil.shutdownAndAwaitTermination(executorService);
core.removeCloseHook(startShutdownHook);
core.removeCloseHook(finishShutdownHook);
}
/**

View File

@ -44,6 +44,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.SolrCore;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
@ -219,12 +220,30 @@ public void testCantConnectToPullReplica() throws Exception {
addDocs(30);
waitForState("Expecting node to be disconnected", collectionName, activeReplicaCount(1, 0, 0));
addDocs(40);
waitForState("Expecting node to be disconnected", collectionName, activeReplicaCount(1, 0, 1));
waitForState("Expecting node to be reconnected", collectionName, activeReplicaCount(1, 0, 1));
try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
assertNumDocs(40, pullReplicaClient);
}
}
public void testCloseHooksDeletedOnReconnect() throws Exception {
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1)
.process(cluster.getSolrClient());
addDocs(10);
DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
Slice s = docCollection.getSlices().iterator().next();
JettySolrRunner jetty = getJettyForReplica(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
SolrCore core = jetty.getCoreContainer().getCores().iterator().next();
for (int i = 0; i < 5; i++) {
cluster.expireZkSession(jetty);
waitForState("Expecting node to reconnect", collectionName, activeReplicaCount(1, 0, 1));
// We have two active ReplicationHandler with two close hooks each, one for triggering recovery and one for doing interval polling
assertEquals(5, core.getCloseHooks().size());
}
}
private void assertNumDocs(int numDocs, SolrClient client, int timeoutSecs) throws InterruptedException, SolrServerException, IOException {
TimeOut t = new TimeOut(timeoutSecs, TimeUnit.SECONDS, TimeSource.NANO_TIME);
long numFound = -1;