LicensesService: Documentation & minor cleanups
Original commit: elastic/x-pack-elasticsearch@5ce349e2f5
This commit is contained in:
parent
50fb5250ee
commit
b79757bb91
|
@ -57,7 +57,7 @@ public class TransportDeleteLicenseAction extends TransportMasterNodeOperationAc
|
|||
@Override
|
||||
protected void masterOperation(final DeleteLicenseRequest request, ClusterState state, final ActionListener<DeleteLicenseResponse> listener) throws ElasticsearchException {
|
||||
final DeleteLicenseRequestHolder requestHolder = new DeleteLicenseRequestHolder(request, "delete licenses []");
|
||||
licensesManagerService.unregisterLicenses(requestHolder, new ActionListener<ClusterStateUpdateResponse>() {
|
||||
licensesManagerService.removeLicenses(requestHolder, new ActionListener<ClusterStateUpdateResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
|
||||
listener.onResponse(new DeleteLicenseResponse(clusterStateUpdateResponse.isAcknowledged()));
|
||||
|
|
|
@ -20,13 +20,26 @@ import static org.elasticsearch.license.plugin.core.LicensesService.PutLicenseRe
|
|||
@ImplementedBy(LicensesService.class)
|
||||
public interface LicensesManagerService {
|
||||
|
||||
//TODO: documentation
|
||||
|
||||
/**
|
||||
* Registers new licenses in the cluster
|
||||
* <p/>
|
||||
* This method can be only called on the master node. It tries to create a new licenses on the master
|
||||
* and if provided license(s) is VALID it is added to cluster state metadata {@link org.elasticsearch.license.plugin.core.LicensesMetaData}
|
||||
*/
|
||||
public void registerLicenses(final PutLicenseRequestHolder requestHolder, final ActionListener<LicensesUpdateResponse> listener);
|
||||
|
||||
public void unregisterLicenses(final DeleteLicenseRequestHolder requestHolder, final ActionListener<ClusterStateUpdateResponse> listener);
|
||||
/**
|
||||
* Remove only signed license(s) for provided features from the cluster state metadata
|
||||
*/
|
||||
public void removeLicenses(final DeleteLicenseRequestHolder requestHolder, final ActionListener<ClusterStateUpdateResponse> listener);
|
||||
|
||||
/**
|
||||
* @return the set of features that are currently enabled
|
||||
*/
|
||||
public Set<String> enabledFeatures();
|
||||
|
||||
/**
|
||||
* @return a list of licenses, contains one license (with the latest expiryDate) per registered features sorted by latest issueDate
|
||||
*/
|
||||
public List<ESLicense> getLicenses();
|
||||
}
|
||||
|
|
|
@ -47,10 +47,9 @@ import static org.elasticsearch.license.core.ESLicenses.reduceAndMap;
|
|||
/**
|
||||
* Service responsible for managing {@link org.elasticsearch.license.plugin.core.LicensesMetaData}
|
||||
* Interfaces through which this is exposed are:
|
||||
* - LicensesManagerService - responsible for adding/deleting signed licenses
|
||||
* - LicensesClientService - allow interested plugins (features) to register to licensing notifications
|
||||
* - LicensesManagerService - responsible for managing signed and one-time-trial licenses
|
||||
* - LicensesClientService - responsible for feature registration and notification to consumer plugin(s)
|
||||
* <p/>
|
||||
* TODO: documentation
|
||||
*/
|
||||
@Singleton
|
||||
public class LicensesService extends AbstractLifecycleComponent<LicensesService> implements ClusterStateListener, LicensesManagerService, LicensesClientService {
|
||||
|
@ -65,12 +64,26 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
|
||||
private final TransportService transportService;
|
||||
|
||||
/**
|
||||
* Currently active consumers to notify to
|
||||
*/
|
||||
private final List<ListenerHolder> registeredListeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
/**
|
||||
* Currently pending consumers that has to be registered
|
||||
*/
|
||||
private final Queue<ListenerHolder> pendingListeners = new ConcurrentLinkedQueue<>();
|
||||
|
||||
/**
|
||||
* Currently active scheduledNotifications
|
||||
* All finished notifications will be cleared in {@link #clusterChanged(org.elasticsearch.cluster.ClusterChangedEvent)}
|
||||
* and {@link #scheduleNextNotification(long)}
|
||||
*/
|
||||
private final Queue<ScheduledFuture> scheduledNotifications = new ConcurrentLinkedQueue<>();
|
||||
|
||||
/**
|
||||
* The last licensesMetaData that has been notified by {@link #notifyFeatures(LicensesMetaData)}
|
||||
*/
|
||||
private final AtomicReference<LicensesMetaData> lastObservedLicensesState;
|
||||
|
||||
@Inject
|
||||
|
@ -81,14 +94,13 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
this.threadPool = threadPool;
|
||||
this.transportService = transportService;
|
||||
this.lastObservedLicensesState = new AtomicReference<>(null);
|
||||
transportService.registerHandler(REGISTER_TRIAL_LICENSE_ACTION_NAME, new RegisterTrialLicenseRequestHandler());
|
||||
if (DiscoveryNode.dataNode(settings) || DiscoveryNode.masterNode(settings)) {
|
||||
transportService.registerHandler(REGISTER_TRIAL_LICENSE_ACTION_NAME, new RegisterTrialLicenseRequestHandler());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers new licenses in the cluster
|
||||
* <p/>
|
||||
* This method can be only called on the master node. It tries to create a new licenses on the master
|
||||
* and if provided license(s) is VALID it is added to cluster metadata.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void registerLicenses(final PutLicenseRequestHolder requestHolder, final ActionListener<LicensesUpdateResponse> listener) {
|
||||
|
@ -141,8 +153,11 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void unregisterLicenses(final DeleteLicenseRequestHolder requestHolder, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
public void removeLicenses(final DeleteLicenseRequestHolder requestHolder, final ActionListener<ClusterStateUpdateResponse> listener) {
|
||||
final DeleteLicenseRequest request = requestHolder.request;
|
||||
clusterService.submitStateUpdateTask(requestHolder.source, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
|
||||
@Override
|
||||
|
@ -177,6 +192,9 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Set<String> enabledFeatures() {
|
||||
Set<String> enabledFeatures = Sets.newHashSet();
|
||||
|
@ -188,6 +206,9 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
return enabledFeatures;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public List<ESLicense> getLicenses() {
|
||||
LicensesMetaData currentMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE);
|
||||
|
@ -239,6 +260,11 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
return status;
|
||||
}
|
||||
|
||||
/**
|
||||
* Master-only operation to generate a one-time trial license for a feature.
|
||||
* The trial license is only generated and stored if the current cluster state metaData
|
||||
* has no signed/one-time-trial license for the feature in question
|
||||
*/
|
||||
private void registerTrialLicense(final RegisterTrialLicenseRequest request) {
|
||||
clusterService.submitStateUpdateTask("register trial license []", new ProcessedClusterStateUpdateTask() {
|
||||
@Override
|
||||
|
@ -334,19 +360,31 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
lastObservedLicensesState.set(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* When there is no global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}:
|
||||
* - tries to register any {@link #pendingListeners} by calling {@link #registeredListeners}
|
||||
* - if any {@link #pendingListeners} are registered successfully or if previous cluster state had a block on
|
||||
* {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}, calls
|
||||
* {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)}
|
||||
* - else calls {@link #notifyFeaturesAndScheduleNotificationIfNeeded(LicensesMetaData)}
|
||||
*
|
||||
* clears up any finished notifications on every call
|
||||
*/
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
clearFinishedNotifications();
|
||||
if (!event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
LicensesMetaData oldLicensesMetaData = event.previousState().getMetaData().custom(LicensesMetaData.TYPE);
|
||||
LicensesMetaData currentLicensesMetaData = event.state().getMetaData().custom(LicensesMetaData.TYPE);
|
||||
logLicenseMetaDataStats("old", oldLicensesMetaData);
|
||||
logLicenseMetaDataStats("new", currentLicensesMetaData);
|
||||
|
||||
// Check pending feature registrations and try to complete registrations
|
||||
boolean addedNewRegisteredListener = false;
|
||||
if (!pendingListeners.isEmpty()) {
|
||||
ListenerHolder pendingRegistrationLister;
|
||||
boolean masterAvailable = false;
|
||||
while ((pendingRegistrationLister = pendingListeners.poll()) != null) {
|
||||
masterAvailable = registerListener(pendingRegistrationLister);
|
||||
boolean masterAvailable = registerListener(pendingRegistrationLister);
|
||||
logger.info("trying to register pending listener for " + pendingRegistrationLister.feature + " masterAvailable: " + masterAvailable);
|
||||
if (!masterAvailable) {
|
||||
// if the master is not available do not, break out of trying pendingListeners
|
||||
|
@ -355,24 +393,21 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
} else {
|
||||
logger.info("successfully registered listener for: " + pendingRegistrationLister.feature);
|
||||
registeredListeners.add(pendingRegistrationLister);
|
||||
// make sure to notify new registered feature
|
||||
// notifications could have been scheduled for it before it was registered
|
||||
addedNewRegisteredListener = true;
|
||||
}
|
||||
}
|
||||
if (masterAvailable) {
|
||||
// make sure to notify new registered feature
|
||||
// notifications could have been scheduled for it before it was registered
|
||||
notifyFeaturesAndScheduleNotification(currentLicensesMetaData);
|
||||
}
|
||||
}
|
||||
|
||||
clearFinishedNotifications();
|
||||
|
||||
// notify all interested plugins
|
||||
// notifyFeaturesIfNeeded will short-circuit with -1 if the currentLicensesMetaData has been notified on earlier
|
||||
// Change to debug
|
||||
logger.info("calling notifyFeaturesAndScheduleNotificationIfNeeded from clusterChanged");
|
||||
if (event.previousState().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
if (event.previousState().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) || addedNewRegisteredListener) {
|
||||
logger.info("calling notifyFeaturesAndScheduleNotification from clusterChanged");
|
||||
notifyFeaturesAndScheduleNotification(currentLicensesMetaData);
|
||||
} else {
|
||||
logger.info("calling notifyFeaturesAndScheduleNotificationIfNeeded from clusterChanged");
|
||||
notifyFeaturesAndScheduleNotificationIfNeeded(currentLicensesMetaData);
|
||||
}
|
||||
} else {
|
||||
|
@ -380,6 +415,10 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} with <code>currentLicensesMetaData</code>
|
||||
* if it was not already notified on
|
||||
*/
|
||||
private void notifyFeaturesAndScheduleNotificationIfNeeded(LicensesMetaData currentLicensesMetaData) {
|
||||
final LicensesMetaData lastNotifiedLicensesMetaData = lastObservedLicensesState.get();
|
||||
if (lastNotifiedLicensesMetaData != null && lastNotifiedLicensesMetaData.equals(currentLicensesMetaData)) {
|
||||
|
@ -389,6 +428,10 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
notifyFeaturesAndScheduleNotification(currentLicensesMetaData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link #notifyFeatures(LicensesMetaData)} with <code>currentLicensesMetaData</code>
|
||||
* if needed, also schedules the earliest expiry notification for registered feature(s)
|
||||
*/
|
||||
private void notifyFeaturesAndScheduleNotification(LicensesMetaData currentLicensesMetaData) {
|
||||
long nextScheduleFrequency = notifyFeatures(currentLicensesMetaData);
|
||||
if (nextScheduleFrequency != -1l) {
|
||||
|
@ -396,6 +439,16 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks license expiry for all the registered feature(s), upon completion
|
||||
* sets <code>currentLicensesMetaData</code> to {@link #lastObservedLicensesState}
|
||||
* to ensure the same license(s) are not notified on from
|
||||
* {@link #clusterChanged(org.elasticsearch.cluster.ClusterChangedEvent)}
|
||||
*
|
||||
* @return -1 if there are no expiring license(s) for any registered feature(s), else
|
||||
* returns the minimum of the expiry times of all the registered feature(s) to
|
||||
* schedule an expiry notification
|
||||
*/
|
||||
private long notifyFeatures(LicensesMetaData currentLicensesMetaData) {
|
||||
long nextScheduleFrequency = -1l;
|
||||
long offset = TimeValue.timeValueMillis(100).getMillis();
|
||||
|
@ -463,6 +516,9 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void register(String feature, TrialLicenseOptions trialLicenseOptions, Listener listener) {
|
||||
final ListenerHolder listenerHolder = new ListenerHolder(feature, trialLicenseOptions, listener);
|
||||
|
@ -482,7 +538,8 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
* then notifies features to be disabled
|
||||
*
|
||||
* @param listenerHolder of the feature to register
|
||||
* @return true if registration has been completed, false otherwise (if masterNode is not available)
|
||||
* @return true if registration has been completed, false otherwise (if masterNode is not available & trail license spec is provided
|
||||
* or if there is a global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK})
|
||||
*/
|
||||
private boolean registerListener(final ListenerHolder listenerHolder) {
|
||||
logger.info("Registering listener for " + listenerHolder.feature);
|
||||
|
@ -554,6 +611,10 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears out any completed notification future from
|
||||
* {@link #scheduledNotifications}
|
||||
*/
|
||||
private void clearFinishedNotifications() {
|
||||
while (!scheduledNotifications.isEmpty()) {
|
||||
ScheduledFuture notification = scheduledNotifications.peek();
|
||||
|
@ -571,6 +632,9 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules an expiry notification with a delay of <code>nextScheduleDelay</code>
|
||||
*/
|
||||
private void scheduleNextNotification(long nextScheduleDelay) {
|
||||
clearFinishedNotifications();
|
||||
|
||||
|
@ -583,6 +647,12 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Job for notifying on expired license(s) to registered feature(s)
|
||||
* In case of a global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK},
|
||||
* the notification is not run, instead the feature(s) would be notified on the next
|
||||
* {@link #clusterChanged(org.elasticsearch.cluster.ClusterChangedEvent)} with no global block
|
||||
*/
|
||||
public class LicensingClientNotificationJob implements Runnable {
|
||||
|
||||
public LicensingClientNotificationJob() {}
|
||||
|
@ -639,6 +709,9 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores configuration and listener for a feature
|
||||
*/
|
||||
private class ListenerHolder {
|
||||
final String feature;
|
||||
final TrialLicenseOptions trialLicenseOptions;
|
||||
|
@ -665,6 +738,10 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thin wrapper to work with {@link org.elasticsearch.license.plugin.core.LicensesMetaData}
|
||||
* Never mutates the wrapped metaData
|
||||
*/
|
||||
private static class LicensesWrapper {
|
||||
|
||||
public static LicensesWrapper wrap(LicensesMetaData licensesMetaData) {
|
||||
|
@ -690,13 +767,15 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request for trial license generation to master
|
||||
*/
|
||||
private static class RegisterTrialLicenseRequest extends TransportRequest {
|
||||
private int maxNodes;
|
||||
private String feature;
|
||||
private TimeValue duration;
|
||||
|
||||
private RegisterTrialLicenseRequest() {
|
||||
}
|
||||
private RegisterTrialLicenseRequest() {}
|
||||
|
||||
private RegisterTrialLicenseRequest(String feature, TimeValue duration, int maxNodes) {
|
||||
this.maxNodes = maxNodes;
|
||||
|
@ -722,6 +801,9 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request handler for trial license generation to master
|
||||
*/
|
||||
private class RegisterTrialLicenseRequestHandler extends BaseTransportRequestHandler<RegisterTrialLicenseRequest> {
|
||||
@Override
|
||||
public RegisterTrialLicenseRequest newInstance() {
|
||||
|
|
Loading…
Reference in New Issue