Remove license state listeners on closables (#36308)
We have a few places where we register license state listeners on transient components (i.e., resources that can be open and closed during the lifecycle of the server). In one case (the opt-out query cache) we were never removing the registered listener, effectively a terrible memory leak. In another case, we were not un-registered the listener that we registered, since we were not referencing the same instance of Runnable. This commit does two things: - introduces a marker interface LicenseStateListener so that it is easier to identify these listeners in the codebase and avoid classes that need to register a license state listener from having to implement Runnable which carries a different semantic meaning than we want here - fixes the two places where we are currently leaking license state listeners
This commit is contained in:
parent
adc8355c5d
commit
d4d3a3e467
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.license;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
|
||||
/**
|
||||
* Marker interface for callbacks that are invoked when the license state changes.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface LicenseStateListener {
|
||||
|
||||
/**
|
||||
* Callback when the license state changes. See {@link XPackLicenseState#update(License.OperationMode, boolean, Version)}.
|
||||
*/
|
||||
void licenseStateChanged();
|
||||
|
||||
}
|
|
@ -266,7 +266,7 @@ public class XPackLicenseState {
|
|||
}
|
||||
}
|
||||
|
||||
private final List<Runnable> listeners;
|
||||
private final List<LicenseStateListener> listeners;
|
||||
private final boolean isSecurityEnabled;
|
||||
private final boolean isSecurityExplicitlyEnabled;
|
||||
|
||||
|
@ -315,17 +315,17 @@ public class XPackLicenseState {
|
|||
}
|
||||
}
|
||||
}
|
||||
listeners.forEach(Runnable::run);
|
||||
listeners.forEach(LicenseStateListener::licenseStateChanged);
|
||||
}
|
||||
|
||||
/** Add a listener to be notified on license change */
|
||||
public void addListener(Runnable runnable) {
|
||||
listeners.add(Objects.requireNonNull(runnable));
|
||||
public void addListener(final LicenseStateListener listener) {
|
||||
listeners.add(Objects.requireNonNull(listener));
|
||||
}
|
||||
|
||||
/** Remove a listener */
|
||||
public void removeListener(Runnable runnable) {
|
||||
listeners.remove(runnable);
|
||||
public void removeListener(final LicenseStateListener listener) {
|
||||
listeners.remove(Objects.requireNonNull(listener));
|
||||
}
|
||||
|
||||
/** Return the current license type. */
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.ingest.IngestMetadata;
|
||||
import org.elasticsearch.ingest.PipelineConfiguration;
|
||||
import org.elasticsearch.license.LicenseStateListener;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
|
||||
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
|
||||
|
@ -78,7 +79,7 @@ import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplat
|
|||
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.pipelineName;
|
||||
import static org.elasticsearch.xpack.monitoring.Monitoring.CLEAN_WATCHER_HISTORY;
|
||||
|
||||
public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener {
|
||||
public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener, LicenseStateListener {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(LocalExporter.class);
|
||||
|
||||
|
@ -106,9 +107,10 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|||
this.clusterAlertBlacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config);
|
||||
this.cleanerService = cleanerService;
|
||||
this.dateTimeFormatter = dateTimeFormatter(config);
|
||||
// if additional listeners are added here, adjust LocalExporterTests#testLocalExporterRemovesListenersOnClose accordingly
|
||||
clusterService.addListener(this);
|
||||
cleanerService.add(this);
|
||||
licenseState.addListener(this::licenseChanged);
|
||||
licenseState.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -121,7 +123,8 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|||
/**
|
||||
* When the license changes, we need to ensure that Watcher is setup properly.
|
||||
*/
|
||||
private void licenseChanged() {
|
||||
@Override
|
||||
public void licenseStateChanged() {
|
||||
watcherSetup.set(false);
|
||||
}
|
||||
|
||||
|
@ -153,7 +156,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
|
|||
// we also remove the listener in resolveBulk after we get to RUNNING, but it's okay to double-remove
|
||||
clusterService.removeListener(this);
|
||||
cleanerService.remove(this);
|
||||
licenseState.removeListener(this::licenseChanged);
|
||||
licenseState.removeListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.monitoring.exporter.local;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
|
||||
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class LocalExporterTests extends ESTestCase {
|
||||
|
||||
public void testLocalExporterRemovesListenersOnClose() {
|
||||
final ClusterService clusterService = mock(ClusterService.class);
|
||||
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
final Exporter.Config config = new Exporter.Config("name", "type", Settings.EMPTY, clusterService, licenseState);
|
||||
final CleanerService cleanerService = mock(CleanerService.class);
|
||||
final LocalExporter exporter = new LocalExporter(config, mock(Client.class), cleanerService);
|
||||
verify(clusterService).addListener(exporter);
|
||||
verify(cleanerService).add(exporter);
|
||||
verify(licenseState).addListener(exporter);
|
||||
exporter.close();
|
||||
verify(clusterService).removeListener(exporter);
|
||||
verify(cleanerService).remove(exporter);
|
||||
verify(licenseState).removeListener(exporter);
|
||||
}
|
||||
|
||||
}
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.index.AbstractIndexComponent;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.cache.query.QueryCache;
|
||||
import org.elasticsearch.indices.IndicesQueryCache;
|
||||
import org.elasticsearch.license.LicenseStateListener;
|
||||
import org.elasticsearch.license.XPackLicenseState;
|
||||
import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
|
||||
import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
|
||||
|
@ -26,7 +27,7 @@ import java.util.Set;
|
|||
* Opts out of the query cache if field level security is active for the current request,
|
||||
* and its unsafe to cache.
|
||||
*/
|
||||
public final class OptOutQueryCache extends AbstractIndexComponent implements QueryCache {
|
||||
public final class OptOutQueryCache extends AbstractIndexComponent implements LicenseStateListener, QueryCache {
|
||||
|
||||
private final IndicesQueryCache indicesQueryCache;
|
||||
private final ThreadContext context;
|
||||
|
@ -43,14 +44,20 @@ public final class OptOutQueryCache extends AbstractIndexComponent implements Qu
|
|||
this.context = Objects.requireNonNull(context, "threadContext must not be null");
|
||||
this.indexName = indexSettings.getIndex().getName();
|
||||
this.licenseState = Objects.requireNonNull(licenseState, "licenseState");
|
||||
licenseState.addListener(() -> this.clear("license state changed"));
|
||||
licenseState.addListener(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws ElasticsearchException {
|
||||
licenseState.removeListener(this);
|
||||
clear("close");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void licenseStateChanged() {
|
||||
clear("license state changed");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(String reason) {
|
||||
logger.debug("full cache clear, reason [{}]", reason);
|
||||
|
|
|
@ -11,8 +11,8 @@ import org.apache.lucene.index.Term;
|
|||
import org.apache.lucene.search.BooleanClause;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.QueryCachingPolicy;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.search.TermQuery;
|
||||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.store.Directory;
|
||||
|
@ -184,6 +184,22 @@ public class OptOutQueryCacheTests extends ESTestCase {
|
|||
verify(indicesQueryCache).doCache(same(weight), same(policy));
|
||||
}
|
||||
|
||||
public void testOptOutQueryCacheRemovesLicenseStateListenerOnClose() {
|
||||
final Settings.Builder settings = Settings.builder()
|
||||
.put("index.version.created", Version.CURRENT)
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0);
|
||||
final IndexMetaData indexMetaData = IndexMetaData.builder("index").settings(settings).build();
|
||||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, Settings.EMPTY);
|
||||
final IndicesQueryCache indicesQueryCache = mock(IndicesQueryCache.class);
|
||||
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
||||
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
|
||||
final OptOutQueryCache cache = new OptOutQueryCache(indexSettings, indicesQueryCache, threadContext, licenseState);
|
||||
verify(licenseState).addListener(cache);
|
||||
cache.close();
|
||||
verify(licenseState).removeListener(cache);
|
||||
}
|
||||
|
||||
private static FieldPermissionsDefinition fieldPermissionDef(String[] granted, String[] denied) {
|
||||
return new FieldPermissionsDefinition(granted, denied);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue