Fix unsafe publication of invalid license enforcer (#40985)
The invalid license enforced is exposed to the cluster state update thread (via the license state listener) before the constructor has finished. This violates the JLS for safe publication of an object, and means there is a concurrency bug lurking here. This commit addresses this by avoiding publication of the invalid license enforcer before the constructor has returned.
This commit is contained in:
parent
e71db0531e
commit
ebba9393c1
|
@ -3,17 +3,19 @@
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.elasticsearch.xpack.ml;
|
package org.elasticsearch.xpack.ml;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
|
import org.elasticsearch.license.LicenseStateListener;
|
||||||
import org.elasticsearch.license.XPackLicenseState;
|
import org.elasticsearch.license.XPackLicenseState;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
|
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
|
||||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
|
||||||
|
|
||||||
public class InvalidLicenseEnforcer {
|
public class InvalidLicenseEnforcer implements LicenseStateListener {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(InvalidLicenseEnforcer.class);
|
private static final Logger logger = LogManager.getLogger(InvalidLicenseEnforcer.class);
|
||||||
|
|
||||||
|
@ -22,17 +24,32 @@ public class InvalidLicenseEnforcer {
|
||||||
private final DatafeedManager datafeedManager;
|
private final DatafeedManager datafeedManager;
|
||||||
private final AutodetectProcessManager autodetectProcessManager;
|
private final AutodetectProcessManager autodetectProcessManager;
|
||||||
|
|
||||||
|
private volatile boolean licenseStateListenerRegistered;
|
||||||
|
|
||||||
InvalidLicenseEnforcer(XPackLicenseState licenseState, ThreadPool threadPool,
|
InvalidLicenseEnforcer(XPackLicenseState licenseState, ThreadPool threadPool,
|
||||||
DatafeedManager datafeedManager, AutodetectProcessManager autodetectProcessManager) {
|
DatafeedManager datafeedManager, AutodetectProcessManager autodetectProcessManager) {
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.licenseState = licenseState;
|
this.licenseState = licenseState;
|
||||||
this.datafeedManager = datafeedManager;
|
this.datafeedManager = datafeedManager;
|
||||||
this.autodetectProcessManager = autodetectProcessManager;
|
this.autodetectProcessManager = autodetectProcessManager;
|
||||||
licenseState.addListener(this::closeJobsAndDatafeedsIfLicenseExpired);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeJobsAndDatafeedsIfLicenseExpired() {
|
void listenForLicenseStateChanges() {
|
||||||
|
/*
|
||||||
|
* Registering this as a listener can not be done in the constructor because otherwise it would be unsafe publication of this. That
|
||||||
|
* is, it would expose this to another thread before the constructor had finished. Therefore, we have a dedicated method to register
|
||||||
|
* the listener that is invoked after the constructor has returned.
|
||||||
|
*/
|
||||||
|
assert licenseStateListenerRegistered == false;
|
||||||
|
licenseState.addListener(this);
|
||||||
|
licenseStateListenerRegistered = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void licenseStateChanged() {
|
||||||
|
assert licenseStateListenerRegistered;
|
||||||
if (licenseState.isMachineLearningAllowed() == false) {
|
if (licenseState.isMachineLearningAllowed() == false) {
|
||||||
|
// if the license has expired, close jobs and datafeeds
|
||||||
threadPool.generic().execute(new AbstractRunnable() {
|
threadPool.generic().execute(new AbstractRunnable() {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
|
@ -47,4 +64,5 @@ public class InvalidLicenseEnforcer {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -461,8 +461,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
|
||||||
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
|
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
|
||||||
autodetectProcessManager, memoryTracker);
|
autodetectProcessManager, memoryTracker);
|
||||||
|
|
||||||
// This object's constructor attaches to the license state, so there's no need to retain another reference to it
|
// this object registers as a license state listener, and is never removed, so there's no need to retain another reference to it
|
||||||
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
|
final InvalidLicenseEnforcer enforcer =
|
||||||
|
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
|
||||||
|
enforcer.listenForLicenseStateChanges();
|
||||||
|
|
||||||
// run node startup tasks
|
// run node startup tasks
|
||||||
autodetectProcessManager.onNodeStartup();
|
autodetectProcessManager.onNodeStartup();
|
||||||
|
|
Loading…
Reference in New Issue