diff --git a/plugin/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java b/plugin/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java index 032a747363d..aac17e6ba10 100644 --- a/plugin/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java +++ b/plugin/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java @@ -894,13 +894,13 @@ public class LicensesService extends AbstractLifecycleComponent } } - private Runnable triggerJob() { + private Runnable triggerJob(final ExpirationCallback callback) { return new Runnable() { @Override public void run() { LicensesMetaData currentLicensesMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE); final Map effectiveLicenses = getEffectiveLicenses(currentLicensesMetaData); - triggerEvents(effectiveLicenses.get(feature), System.currentTimeMillis()); + triggerEvent(effectiveLicenses.get(feature), System.currentTimeMillis(), callback); } }; } @@ -935,7 +935,7 @@ public class LicensesService extends AbstractLifecycleComponent delay = expiryDuration - expirationCallback.max().getMillis(); break; case POST: - if (expiryDuration > 0l) { + if (expiryDuration >= 0l) { delay = expiryDuration + expirationCallback.min().getMillis(); } else { delay = (-1l * expiryDuration) - expirationCallback.min().getMillis(); @@ -943,24 +943,39 @@ public class LicensesService extends AbstractLifecycleComponent break; } if (delay > 0l) { - notificationQueue.add(threadPool.schedule(TimeValue.timeValueMillis(delay), executorName(), triggerJob())); + if (logger.isDebugEnabled()) { + logger.debug("Adding first notification for: orientation: " + expirationCallback.orientation().name() + + " min: " + expirationCallback.min() + + " max: " + expirationCallback.max() + + " with delay: " + TimeValue.timeValueMillis(delay) + + " license expiry duration: " + TimeValue.timeValueMillis(expiryDuration)); + } + notificationQueue.add(threadPool.schedule(TimeValue.timeValueMillis(delay), executorName(), triggerJob(expirationCallback))); } } } if (license != null) { // schedule first event of callbacks that match - triggerEvents(license, now); + logger.debug("Calling TRIGGER_EVENTS with license for " + license.feature() + " expiry: " + license.expiryDate()); + for (ExpirationCallback expirationCallback : expirationCallbacks) { + triggerEvent(license, now, expirationCallback); + } } } - private void triggerEvents(final License license, long now) { - for (ExpirationCallback expirationCallback : expirationCallbacks) { - if (expirationCallback.matches(license.expiryDate(), now)) { - long expiryDuration = license.expiryDate() - now; - boolean expired = expiryDuration <= 0l; - expirationCallback.on(license, new ExpirationStatus(expired, TimeValue.timeValueMillis((!expired) ? expiryDuration : (-1l * expiryDuration)))); - notificationQueue.add(threadPool.schedule(expirationCallback.frequency(), executorName(), triggerJob())); + private void triggerEvent(final License license, long now, final ExpirationCallback expirationCallback) { + if (expirationCallback.matches(license.expiryDate(), now)) { + long expiryDuration = license.expiryDate() - now; + boolean expired = expiryDuration <= 0l; + if (logger.isDebugEnabled()) { + logger.debug("Calling notification on: orientation: " + expirationCallback.orientation().name() + + " min: " + expirationCallback.min() + + " max: " + expirationCallback.max() + + " scheduled after: " + expirationCallback.frequency().getMillis() + + " next interval match: " + expirationCallback.matches(license.expiryDate(), System.currentTimeMillis() + expirationCallback.frequency().getMillis())); } + expirationCallback.on(license, new ExpirationStatus(expired, TimeValue.timeValueMillis((!expired) ? expiryDuration : (-1l * expiryDuration)))); + notificationQueue.add(threadPool.schedule(expirationCallback.frequency(), executorName(), triggerJob(expirationCallback))); } } diff --git a/plugin/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTests.java b/plugin/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTests.java index 310deda38f9..f47391aaba7 100644 --- a/plugin/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTests.java +++ b/plugin/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTests.java @@ -117,7 +117,7 @@ public abstract class AbstractLicensesIntegrationTests extends ElasticsearchInte } return true; } - }, 2, TimeUnit.SECONDS), equalTo(true)); + }, 5, TimeUnit.SECONDS), equalTo(true)); } protected void assertEagerConsumerPluginDisableNotification(int timeoutInSec) throws InterruptedException { diff --git a/plugin/src/test/java/org/elasticsearch/license/plugin/LicensesClientServiceTests.java b/plugin/src/test/java/org/elasticsearch/license/plugin/LicensesClientServiceTests.java index 717403c83af..a99c77c22c1 100644 --- a/plugin/src/test/java/org/elasticsearch/license/plugin/LicensesClientServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/license/plugin/LicensesClientServiceTests.java @@ -127,7 +127,7 @@ public class LicensesClientServiceTests extends AbstractLicensesServiceTests { assertThat(pre.matches(now - expiryDuration.getMillis(), now), equalTo(false)); } - @Test @Ignore + @Test public void testMultipleEventNotification() throws Exception { final LicensesManagerService licensesManagerService = masterLicensesManagerService(); final LicensesClientService clientService = licensesClientService(); @@ -135,35 +135,38 @@ public class LicensesClientServiceTests extends AbstractLicensesServiceTests { TestTrackingClientListener clientListener = new TestTrackingClientListener(feature, true); List callbacks = new ArrayList<>(); - final AtomicInteger triggerCount1 = new AtomicInteger(0); - callbacks.add(preCallbackLatch(TimeValue.timeValueMillis(499), TimeValue.timeValueMillis(999), TimeValue.timeValueMillis(100), triggerCount1)); - final AtomicInteger triggerCount2 = new AtomicInteger(0); - callbacks.add(postCallbackLatch(TimeValue.timeValueMillis(10), null, TimeValue.timeValueMillis(200), triggerCount2)); - final AtomicInteger triggerCount3 = new AtomicInteger(0); - callbacks.add(preCallbackLatch(TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(2), TimeValue.timeValueMillis(500), triggerCount3)); + final AtomicInteger preTriggerCountStartingFromOneSecond = new AtomicInteger(0); + callbacks.add(preCallbackLatch(TimeValue.timeValueMillis(499), TimeValue.timeValueMillis(999), TimeValue.timeValueMillis(100), preTriggerCountStartingFromOneSecond)); + final AtomicInteger postTriggerCount = new AtomicInteger(0); + callbacks.add(postCallbackLatch(TimeValue.timeValueMillis(10), null, TimeValue.timeValueMillis(200), postTriggerCount)); + final AtomicInteger preTriggerCountStartingFromTwoSeconds = new AtomicInteger(0); + callbacks.add(preCallbackLatch(TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(2), TimeValue.timeValueMillis(500), preTriggerCountStartingFromTwoSeconds)); List actions = new ArrayList<>(); actions.add(registerWithEventNotification(clientService, clientListener, feature, TimeValue.timeValueSeconds(3), callbacks)); actions.add(assertExpiryAction(feature, "trial", TimeValue.timeValueMinutes(1))); assertClientListenerNotificationCount(clientListener, actions); - assertThat(triggerCount3.get(), greaterThanOrEqualTo(2)); - assertThat(triggerCount3.get(), lessThan(4)); - assertThat(triggerCount1.get(), greaterThan(4)); + assertThat(preTriggerCountStartingFromTwoSeconds.get(), greaterThanOrEqualTo(2)); + assertThat(preTriggerCountStartingFromTwoSeconds.get(), lessThan(4)); + assertThat(preTriggerCountStartingFromOneSecond.get(), greaterThan(4)); + int initialPreTriggerStartingFromTwoSeconds = preTriggerCountStartingFromTwoSeconds.get(); + int initialPreTriggerCountStartingFromOneSecond = preTriggerCountStartingFromOneSecond.get(); assertThat(awaitBusy(new Predicate() { @Override public boolean apply(Object o) { - if (triggerCount2.get() > 0) { + if (postTriggerCount.get() > 2) { return true; } return false; } }, 2, TimeUnit.SECONDS), equalTo(true)); - int previousTriggerCount = triggerCount2.get(); + int previousTriggerCount = postTriggerCount.get(); // Update license generateAndPutSignedLicenseAction(licensesManagerService, feature, TimeValue.timeValueSeconds(10)).run(); - Thread.sleep(500); - assertThat(previousTriggerCount, lessThanOrEqualTo(triggerCount2.get() + 1)); + assertThat(previousTriggerCount, lessThanOrEqualTo(postTriggerCount.get() + 1)); + assertThat(initialPreTriggerCountStartingFromOneSecond, equalTo(preTriggerCountStartingFromOneSecond.get())); + assertThat(initialPreTriggerStartingFromTwoSeconds, equalTo(preTriggerCountStartingFromTwoSeconds.get())); } @Test @@ -352,7 +355,7 @@ public class LicensesClientServiceTests extends AbstractLicensesServiceTests { for (int i = 0; i < randomIntBetween(5, 10); i++) { String feature = "feature_" + String.valueOf(i); final TestTrackingClientListener clientListener = new TestTrackingClientListener(feature); - expiryDuration = TimeValue.timeValueMillis(randomIntBetween(1, 3) * 1000l + expiryDuration.millis()); + expiryDuration = TimeValue.timeValueMillis(randomIntBetween(2, 4) * 1000l + expiryDuration.millis()); List actions = new ArrayList<>(); if (randomBoolean()) {