diff --git a/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java b/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java
index 69f56e2184c..a7a110224d6 100644
--- a/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java
+++ b/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java
@@ -18,6 +18,7 @@ import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -50,30 +51,30 @@ import static org.elasticsearch.license.core.ESLicenses.reduceAndMap;
* - LicensesManagerService - responsible for managing signed and one-time-trial licenses
* - LicensesClientService - responsible for feature registration and notification to consumer plugin(s)
*
- *
+ *
* Notification Scheme:
- *
- * All registered feature(s) are notified using {@link #notifyFeatures(LicensesMetaData)} (depends on the current
- * {@link #registeredListeners}). It is idempotent with respect to all the feature listeners.
- *
- * The notification scheduling is done by {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} which does the following:
- * - calls {@link #notifyFeatures(LicensesMetaData)} to notify all registered feature(s)
- * - if there is any license(s) with a future expiry date in the current cluster state:
- * - schedules a delayed {@link LicensingClientNotificationJob} on the MIN of all the expiry dates of all the registered feature(s)
- *
- * The {@link LicensingClientNotificationJob} calls {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} to schedule
- * another delayed {@link LicensingClientNotificationJob} as stated above. It is a no-op in case of a global block on
- * {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}
- *
- * Upon successful registration of a new feature:
- * - {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} is called
- *
- * Upon clusterChanged():
- * - {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} is called if:
- * - new trial/signed license(s) are found in the cluster state meta data
- * - if new feature(s) are added to the registeredListener
- * - if the previous cluster state had a global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}
- * - no-op in case of global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}
+ *
+ * All registered feature(s) are notified using {@link #notifyFeatures(LicensesMetaData)} (depends on the current
+ * {@link #registeredListeners}). It is idempotent with respect to all the feature listeners.
+ *
+ * The notification scheduling is done by {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} which does the following:
+ * - calls {@link #notifyFeatures(LicensesMetaData)} to notify all registered feature(s)
+ * - if there is any license(s) with a future expiry date in the current cluster state:
+ * - schedules a delayed {@link LicensingClientNotificationJob} on the MIN of all the expiry dates of all the registered feature(s)
+ *
+ * The {@link LicensingClientNotificationJob} calls {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} to schedule
+ * another delayed {@link LicensingClientNotificationJob} as stated above. It is a no-op in case of a global block on
+ * {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}
+ *
+ * Upon successful registration of a new feature:
+ * - {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} is called
+ *
+ * Upon clusterChanged():
+ * - {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} is called if:
+ * - new trial/signed license(s) are found in the cluster state meta data
+ * - if new feature(s) are added to the registeredListener
+ * - if the previous cluster state had a global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}
+ * - no-op in case of global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}
*/
@Singleton
public class LicensesService extends AbstractLifecycleComponent implements ClusterStateListener, LicensesManagerService, LicensesClientService {
@@ -378,46 +379,37 @@ public class LicensesService extends AbstractLifecycleComponent
/**
* When there is no global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}:
- * - tries to register any {@link #pendingListeners} by calling {@link #registeredListeners}
- * - if any {@link #pendingListeners} are registered successfully or if previous cluster state had a block on
- * {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}, calls
- * {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)}
- * - else calls {@link #notifyFeaturesAndScheduleNotificationIfNeeded(LicensesMetaData)}
+ * - tries to register any {@link #pendingListeners} by calling {@link #registeredListeners}
+ * - if any {@link #pendingListeners} are registered successfully or if previous cluster state had a block on
+ * {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}, calls
+ * {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)}
+ * - else calls {@link #notifyFeaturesAndScheduleNotificationIfNeeded(LicensesMetaData)}
*/
@Override
public void clusterChanged(ClusterChangedEvent event) {
- final ClusterState currentClusterState = event.state();
final ClusterState previousClusterState = event.previousState();
+ final ClusterState currentClusterState = event.state();
if (!currentClusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
- LicensesMetaData oldLicensesMetaData = previousClusterState.getMetaData().custom(LicensesMetaData.TYPE);
- LicensesMetaData currentLicensesMetaData = currentClusterState.getMetaData().custom(LicensesMetaData.TYPE);
+ final LicensesMetaData oldLicensesMetaData = previousClusterState.getMetaData().custom(LicensesMetaData.TYPE);
+ final LicensesMetaData currentLicensesMetaData = currentClusterState.getMetaData().custom(LicensesMetaData.TYPE);
logLicenseMetaDataStats("old", oldLicensesMetaData);
logLicenseMetaDataStats("new", currentLicensesMetaData);
- // Check pending feature registrations and try to complete registrations
- boolean addedNewRegisteredListener = false;
if (!pendingListeners.isEmpty()) {
- ListenerHolder pendingRegistrationLister;
- while ((pendingRegistrationLister = pendingListeners.poll()) != null) {
- boolean masterAvailable = registerListener(pendingRegistrationLister);
- logger.debug("trying to register pending listener for " + pendingRegistrationLister.feature + " masterAvailable: " + masterAvailable);
+ ListenerHolder pendingRegistrationListener;
+ while ((pendingRegistrationListener = pendingListeners.poll()) != null) {
+ boolean masterAvailable = registerListener(pendingRegistrationListener);
+ logger.debug("trying to register pending listener for " + pendingRegistrationListener.feature + " masterAvailable: " + masterAvailable);
if (!masterAvailable) {
// if the master is not available do not, break out of trying pendingListeners
- pendingListeners.add(pendingRegistrationLister);
+ pendingListeners.add(pendingRegistrationListener);
break;
- } else {
- logger.debug("successfully registered listener for: " + pendingRegistrationLister.feature);
- registeredListeners.add(pendingRegistrationLister);
- // make sure to notify new registered feature
- // notifications could have been scheduled for it before it was registered
- addedNewRegisteredListener = true;
}
}
}
// notify all interested plugins
- // Change to debug
- if (previousClusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) || addedNewRegisteredListener) {
+ if (previousClusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
logger.debug("calling notifyFeaturesAndScheduleNotification from clusterChanged");
notifyFeaturesAndScheduleNotification(currentLicensesMetaData);
} else {
@@ -433,7 +425,7 @@ public class LicensesService extends AbstractLifecycleComponent
* Calls {@link #notifyFeaturesAndScheduleNotification(LicensesMetaData)} with currentLicensesMetaData
* if it was not already notified on
*/
- private void notifyFeaturesAndScheduleNotificationIfNeeded(LicensesMetaData currentLicensesMetaData) {
+ private void notifyFeaturesAndScheduleNotificationIfNeeded(final LicensesMetaData currentLicensesMetaData) {
final LicensesMetaData lastNotifiedLicensesMetaData = lastObservedLicensesState.get();
if (lastNotifiedLicensesMetaData != null && lastNotifiedLicensesMetaData.equals(currentLicensesMetaData)) {
logger.debug("currentLicensesMetaData has been already notified on");
@@ -446,7 +438,7 @@ public class LicensesService extends AbstractLifecycleComponent
* Calls {@link #notifyFeatures(LicensesMetaData)} with currentLicensesMetaData
* and schedules the earliest expiry (if any) notification for registered feature(s)
*/
- private void notifyFeaturesAndScheduleNotification(LicensesMetaData currentLicensesMetaData) {
+ private void notifyFeaturesAndScheduleNotification(final LicensesMetaData currentLicensesMetaData) {
long nextScheduleFrequency = notifyFeatures(currentLicensesMetaData);
if (nextScheduleFrequency != -1l) {
scheduleNextNotification(nextScheduleFrequency);
@@ -463,7 +455,7 @@ public class LicensesService extends AbstractLifecycleComponent
* returns the minimum of the expiry times of all the registered feature(s) to
* schedule an expiry notification
*/
- private long notifyFeatures(LicensesMetaData currentLicensesMetaData) {
+ private long notifyFeatures(final LicensesMetaData currentLicensesMetaData) {
long nextScheduleFrequency = -1l;
for (ListenerHolder listenerHolder : registeredListeners) {
long expiryDate = expiryDateForFeature(listenerHolder.feature, currentLicensesMetaData);
@@ -496,8 +488,6 @@ public class LicensesService extends AbstractLifecycleComponent
}
}
- lastObservedLicensesState.set(currentLicensesMetaData);
-
if (logger.isDebugEnabled()) {
logLicenseMetaDataStats("Setting last observed metaData", currentLicensesMetaData);
if (nextScheduleFrequency == -1l) {
@@ -507,9 +497,11 @@ public class LicensesService extends AbstractLifecycleComponent
}
}
+ lastObservedLicensesState.set(currentLicensesMetaData);
return nextScheduleFrequency;
}
+
private void logLicenseMetaDataStats(String prefix, LicensesMetaData licensesMetaData) {
if (licensesMetaData != null) {
logger.debug(prefix + " LicensesMetaData: signedLicenses: " + licensesMetaData.getSignatures().size() + " trialLicenses: " + licensesMetaData.getEncodedTrialLicenses().size());
@@ -524,23 +516,33 @@ public class LicensesService extends AbstractLifecycleComponent
@Override
public void register(String feature, TrialLicenseOptions trialLicenseOptions, Listener listener) {
final ListenerHolder listenerHolder = new ListenerHolder(feature, trialLicenseOptions, listener);
- logger.debug("add listener for: " + listenerHolder.feature + " to pending registration queue");
- pendingListeners.add(listenerHolder);
+ // don't trust the clusterState for blocks just yet!
+ final Lifecycle.State state = clusterService.lifecycleState();
+ if (state != Lifecycle.State.STARTED) {
+ pendingListeners.add(listenerHolder);
+ } else {
+ if (!registerListener(listenerHolder)) {
+ pendingListeners.add(listenerHolder);
+ }
+ }
}
/**
* Notifies new feature listener if it already has a signed license
* if new feature has a non-null trial license option, a master node request is made to generate the trial license
- * if no trial license option is specified for the feature and no signed license is found,
- * then notifies features to be disabled
- * then notifies features to be disabled
+ * then notifies features if needed
*
* @param listenerHolder of the feature to register
- * @return true if registration has been completed, false otherwise (if masterNode is not available & trail license spec is provided
+ * @return true if registration has been completed, false otherwise (if masterNode is not available & trail license spec is provided)
+ * or if there is a global block on {@link org.elasticsearch.gateway.GatewayService#STATE_NOT_RECOVERED_BLOCK}
*/
private boolean registerListener(final ListenerHolder listenerHolder) {
logger.debug("Registering listener for " + listenerHolder.feature);
- ClusterState currentState = clusterService.state();
+ final ClusterState currentState = clusterService.state();
+ if (currentState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
+ return false;
+ }
+
LicensesMetaData currentMetaData = currentState.metaData().custom(LicensesMetaData.TYPE);
if (expiryDateForFeature(listenerHolder.feature, currentMetaData) == -1l) {
// does not have any license so generate a trial license
@@ -564,17 +566,20 @@ public class LicensesService extends AbstractLifecycleComponent
return false;
}
}
+ registeredListeners.add(listenerHolder);
} else {
// notify feature as clusterChangedEvent may not happen
// as no trial or signed license has been found for feature
// Change to debug
logger.debug("Calling notifyFeaturesAndScheduleNotification [no trial license spec provided]");
+ registeredListeners.add(listenerHolder);
notifyFeaturesAndScheduleNotification(currentMetaData);
}
} else {
// signed license already found for the new registered
// feature, notify feature on registration
logger.debug("Calling notifyFeaturesAndScheduleNotification [signed/trial license available]");
+ registeredListeners.add(listenerHolder);
notifyFeaturesAndScheduleNotification(currentMetaData);
}
return true;
@@ -584,8 +589,6 @@ public class LicensesService extends AbstractLifecycleComponent
final Map effectiveLicenses = getEffectiveLicenses(currentLicensesMetaData);
ESLicense featureLicense;
if ((featureLicense = effectiveLicenses.get(feature)) != null) {
- logger.debug("effective license for " + feature + " relative expiry: " +
- TimeValue.timeValueMillis(effectiveLicenses.get(feature).expiryDate() - System.currentTimeMillis()));
return featureLicense.expiryDate();
}
logger.debug("no effective license for " + feature);
@@ -648,7 +651,8 @@ public class LicensesService extends AbstractLifecycleComponent
*/
private class LicensingClientNotificationJob implements Runnable {
- public LicensingClientNotificationJob() {}
+ public LicensingClientNotificationJob() {
+ }
@Override
public void run() {
@@ -766,7 +770,8 @@ public class LicensesService extends AbstractLifecycleComponent
private String feature;
private TimeValue duration;
- private RegisterTrialLicenseRequest() {}
+ private RegisterTrialLicenseRequest() {
+ }
private RegisterTrialLicenseRequest(String feature, TimeValue duration, int maxNodes) {
this.maxNodes = maxNodes;
diff --git a/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTests.java b/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTests.java
index 34ff434786e..89792500ab8 100644
--- a/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTests.java
+++ b/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTests.java
@@ -16,14 +16,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.core.ESLicense;
import org.elasticsearch.license.licensor.ESLicenseSigner;
-import org.elasticsearch.license.plugin.consumer.TestPluginService1;
-import org.elasticsearch.license.plugin.consumer.TestPluginService2;
+import org.elasticsearch.license.plugin.consumer.EagerLicenseRegistrationPluginService;
+import org.elasticsearch.license.plugin.consumer.LazyLicenseRegistrationPluginService;
import org.elasticsearch.license.plugin.consumer.TestPluginServiceBase;
import org.elasticsearch.license.plugin.core.LicensesManagerService;
import org.elasticsearch.license.plugin.core.LicensesMetaData;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
-import org.hamcrest.Matchers;
import java.util.*;
import java.util.concurrent.CountDownLatch;
@@ -115,30 +114,30 @@ public abstract class AbstractLicensesIntegrationTests extends ElasticsearchInte
}, 2, TimeUnit.SECONDS), equalTo(true));
}
- protected void assertConsumerPlugin1DisableNotification(int timeoutInSec) throws InterruptedException {
- assertConsumerPlugin1Notification(false, timeoutInSec);
+ protected void assertEagerConsumerPluginDisableNotification(int timeoutInSec) throws InterruptedException {
+ assertEagerConsumerPluginNotification(false, timeoutInSec);
}
- protected void assertConsumerPlugin1EnableNotification(int timeoutInSec) throws InterruptedException {
- assertConsumerPlugin1Notification(true, timeoutInSec);
+ protected void assertEagerConsumerPluginEnableNotification(int timeoutInSec) throws InterruptedException {
+ assertEagerConsumerPluginNotification(true, timeoutInSec);
}
- protected void assertConsumerPlugin2DisableNotification(int timeoutInSec) throws InterruptedException {
- assertConsumerPlugin2Notification(false, timeoutInSec);
+ protected void assertLazyConsumerPluginDisableNotification(int timeoutInSec) throws InterruptedException {
+ assertLazyConsumerPluginNotification(false, timeoutInSec);
}
- protected void assertConsumerPlugin2EnableNotification(int timeoutInSec) throws InterruptedException {
- assertConsumerPlugin2Notification(true, timeoutInSec);
+ protected void assertLazyConsumerPluginEnableNotification(int timeoutInSec) throws InterruptedException {
+ assertLazyConsumerPluginNotification(true, timeoutInSec);
}
- protected void assertConsumerPlugin2Notification(final boolean expectedEnabled, int timeoutInSec) throws InterruptedException {
- final List consumerPluginServices = consumerPlugin2Services();
+ protected void assertLazyConsumerPluginNotification(final boolean expectedEnabled, int timeoutInSec) throws InterruptedException {
+ final List consumerPluginServices = consumerLazyPluginServices();
assertThat("At least one instance has to be present", consumerPluginServices.size(), greaterThan(0));
assertConsumerPluginNotification(consumerPluginServices, expectedEnabled, timeoutInSec);
}
- protected void assertConsumerPlugin1Notification(final boolean expectedEnabled, int timeoutInSec) throws InterruptedException {
- final List consumerPluginServices = consumerPlugin1Services();
+ protected void assertEagerConsumerPluginNotification(final boolean expectedEnabled, int timeoutInSec) throws InterruptedException {
+ final List consumerPluginServices = consumerEagerPluginServices();
assertThat("At least one instance has to be present", consumerPluginServices.size(), greaterThan(0));
assertConsumerPluginNotification(consumerPluginServices, expectedEnabled, timeoutInSec);
}
@@ -158,19 +157,19 @@ public abstract class AbstractLicensesIntegrationTests extends ElasticsearchInte
}
- private List consumerPlugin2Services() {
+ private List consumerLazyPluginServices() {
final InternalTestCluster clients = internalCluster();
List consumerPluginServices = new ArrayList<>();
- for (TestPluginServiceBase service : clients.getDataNodeInstances(TestPluginService2.class)) {
+ for (TestPluginServiceBase service : clients.getDataNodeInstances(LazyLicenseRegistrationPluginService.class)) {
consumerPluginServices.add(service);
}
return consumerPluginServices;
}
- private List consumerPlugin1Services() {
+ private List consumerEagerPluginServices() {
final InternalTestCluster clients = internalCluster();
List consumerPluginServices = new ArrayList<>();
- for (TestPluginServiceBase service : clients.getDataNodeInstances(TestPluginService1.class)) {
+ for (TestPluginServiceBase service : clients.getDataNodeInstances(EagerLicenseRegistrationPluginService.class)) {
consumerPluginServices.add(service);
}
return consumerPluginServices;
diff --git a/src/test/java/org/elasticsearch/license/plugin/LicensesPluginIntegrationTests.java b/src/test/java/org/elasticsearch/license/plugin/LicensesPluginIntegrationTests.java
index 0d24785bc6a..5b5c0fa0a56 100644
--- a/src/test/java/org/elasticsearch/license/plugin/LicensesPluginIntegrationTests.java
+++ b/src/test/java/org/elasticsearch/license/plugin/LicensesPluginIntegrationTests.java
@@ -5,15 +5,19 @@
*/
package org.elasticsearch.license.plugin;
+import org.elasticsearch.common.base.Predicate;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.license.core.ESLicense;
-import org.elasticsearch.license.plugin.consumer.TestConsumerPlugin1;
-import org.elasticsearch.license.plugin.consumer.TestPluginService1;
+import org.elasticsearch.license.plugin.consumer.EagerLicenseRegistrationConsumerPlugin;
+import org.elasticsearch.license.plugin.consumer.EagerLicenseRegistrationPluginService;
import org.elasticsearch.license.plugin.action.put.PutLicenseRequestBuilder;
import org.elasticsearch.license.plugin.action.put.PutLicenseResponse;
+import org.elasticsearch.license.plugin.consumer.LazyLicenseRegistrationConsumerPlugin;
+import org.elasticsearch.license.plugin.consumer.LazyLicenseRegistrationPluginService;
import org.elasticsearch.license.plugin.core.LicensesStatus;
import org.junit.After;
import org.junit.Test;
@@ -25,100 +29,134 @@ import static org.hamcrest.CoreMatchers.equalTo;
@ClusterScope(scope = TEST, numDataNodes = 10, numClientNodes = 0)
public class LicensesPluginIntegrationTests extends AbstractLicensesIntegrationTests {
+ private final boolean useEagerLicenseRegistrationPlugin = randomBoolean();
+
private final int trialLicenseDurationInSeconds = 2;
- private final String FEATURE_NAME = TestPluginService1.FEATURE_NAME;
-
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.settingsBuilder()
.put(super.nodeSettings(nodeOrdinal))
- .put(TestConsumerPlugin1.NAME + ".trial_license_duration_in_seconds", trialLicenseDurationInSeconds)
- .put("plugin.types", LicensePlugin.class.getName() + "," + TestConsumerPlugin1.class.getName())
+ .put(((useEagerLicenseRegistrationPlugin) ? EagerLicenseRegistrationConsumerPlugin.NAME : LazyLicenseRegistrationConsumerPlugin.NAME)
+ + ".trial_license_duration_in_seconds", trialLicenseDurationInSeconds)
+ .putArray("plugin.types", LicensePlugin.class.getName(),
+ (useEagerLicenseRegistrationPlugin) ? EagerLicenseRegistrationConsumerPlugin.class.getName() : LazyLicenseRegistrationConsumerPlugin.class.getName())
.build();
}
@After
- public void beforeTest() throws Exception {
+ public void afterTest() throws Exception {
wipeAllLicenses();
+ assertThat(awaitBusy(new Predicate