[TEST] Restructure notification; Increase logging; Add back tests
Original commit: elastic/x-pack-elasticsearch@eed6bdfa11
This commit is contained in:
parent
83651e3314
commit
1d3457427c
|
@ -894,13 +894,13 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
}
|
||||
}
|
||||
|
||||
private Runnable triggerJob() {
|
||||
private Runnable triggerJob(final ExpirationCallback callback) {
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
LicensesMetaData currentLicensesMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE);
|
||||
final Map<String, License> effectiveLicenses = getEffectiveLicenses(currentLicensesMetaData);
|
||||
triggerEvents(effectiveLicenses.get(feature), System.currentTimeMillis());
|
||||
triggerEvent(effectiveLicenses.get(feature), System.currentTimeMillis(), callback);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -935,7 +935,7 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
delay = expiryDuration - expirationCallback.max().getMillis();
|
||||
break;
|
||||
case POST:
|
||||
if (expiryDuration > 0l) {
|
||||
if (expiryDuration >= 0l) {
|
||||
delay = expiryDuration + expirationCallback.min().getMillis();
|
||||
} else {
|
||||
delay = (-1l * expiryDuration) - expirationCallback.min().getMillis();
|
||||
|
@ -943,24 +943,39 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
|
|||
break;
|
||||
}
|
||||
if (delay > 0l) {
|
||||
notificationQueue.add(threadPool.schedule(TimeValue.timeValueMillis(delay), executorName(), triggerJob()));
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Adding first notification for: orientation: " + expirationCallback.orientation().name()
|
||||
+ " min: " + expirationCallback.min()
|
||||
+ " max: " + expirationCallback.max()
|
||||
+ " with delay: " + TimeValue.timeValueMillis(delay)
|
||||
+ " license expiry duration: " + TimeValue.timeValueMillis(expiryDuration));
|
||||
}
|
||||
notificationQueue.add(threadPool.schedule(TimeValue.timeValueMillis(delay), executorName(), triggerJob(expirationCallback)));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (license != null) {
|
||||
// schedule first event of callbacks that match
|
||||
triggerEvents(license, now);
|
||||
logger.debug("Calling TRIGGER_EVENTS with license for " + license.feature() + " expiry: " + license.expiryDate());
|
||||
for (ExpirationCallback expirationCallback : expirationCallbacks) {
|
||||
triggerEvent(license, now, expirationCallback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void triggerEvents(final License license, long now) {
|
||||
for (ExpirationCallback expirationCallback : expirationCallbacks) {
|
||||
if (expirationCallback.matches(license.expiryDate(), now)) {
|
||||
long expiryDuration = license.expiryDate() - now;
|
||||
boolean expired = expiryDuration <= 0l;
|
||||
expirationCallback.on(license, new ExpirationStatus(expired, TimeValue.timeValueMillis((!expired) ? expiryDuration : (-1l * expiryDuration))));
|
||||
notificationQueue.add(threadPool.schedule(expirationCallback.frequency(), executorName(), triggerJob()));
|
||||
private void triggerEvent(final License license, long now, final ExpirationCallback expirationCallback) {
|
||||
if (expirationCallback.matches(license.expiryDate(), now)) {
|
||||
long expiryDuration = license.expiryDate() - now;
|
||||
boolean expired = expiryDuration <= 0l;
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Calling notification on: orientation: " + expirationCallback.orientation().name()
|
||||
+ " min: " + expirationCallback.min()
|
||||
+ " max: " + expirationCallback.max()
|
||||
+ " scheduled after: " + expirationCallback.frequency().getMillis()
|
||||
+ " next interval match: " + expirationCallback.matches(license.expiryDate(), System.currentTimeMillis() + expirationCallback.frequency().getMillis()));
|
||||
}
|
||||
expirationCallback.on(license, new ExpirationStatus(expired, TimeValue.timeValueMillis((!expired) ? expiryDuration : (-1l * expiryDuration))));
|
||||
notificationQueue.add(threadPool.schedule(expirationCallback.frequency(), executorName(), triggerJob(expirationCallback)));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ public abstract class AbstractLicensesIntegrationTests extends ElasticsearchInte
|
|||
}
|
||||
return true;
|
||||
}
|
||||
}, 2, TimeUnit.SECONDS), equalTo(true));
|
||||
}, 5, TimeUnit.SECONDS), equalTo(true));
|
||||
}
|
||||
|
||||
protected void assertEagerConsumerPluginDisableNotification(int timeoutInSec) throws InterruptedException {
|
||||
|
|
|
@ -127,7 +127,7 @@ public class LicensesClientServiceTests extends AbstractLicensesServiceTests {
|
|||
assertThat(pre.matches(now - expiryDuration.getMillis(), now), equalTo(false));
|
||||
}
|
||||
|
||||
@Test @Ignore
|
||||
@Test
|
||||
public void testMultipleEventNotification() throws Exception {
|
||||
final LicensesManagerService licensesManagerService = masterLicensesManagerService();
|
||||
final LicensesClientService clientService = licensesClientService();
|
||||
|
@ -135,35 +135,38 @@ public class LicensesClientServiceTests extends AbstractLicensesServiceTests {
|
|||
TestTrackingClientListener clientListener = new TestTrackingClientListener(feature, true);
|
||||
|
||||
List<LicensesService.ExpirationCallback> callbacks = new ArrayList<>();
|
||||
final AtomicInteger triggerCount1 = new AtomicInteger(0);
|
||||
callbacks.add(preCallbackLatch(TimeValue.timeValueMillis(499), TimeValue.timeValueMillis(999), TimeValue.timeValueMillis(100), triggerCount1));
|
||||
final AtomicInteger triggerCount2 = new AtomicInteger(0);
|
||||
callbacks.add(postCallbackLatch(TimeValue.timeValueMillis(10), null, TimeValue.timeValueMillis(200), triggerCount2));
|
||||
final AtomicInteger triggerCount3 = new AtomicInteger(0);
|
||||
callbacks.add(preCallbackLatch(TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(2), TimeValue.timeValueMillis(500), triggerCount3));
|
||||
final AtomicInteger preTriggerCountStartingFromOneSecond = new AtomicInteger(0);
|
||||
callbacks.add(preCallbackLatch(TimeValue.timeValueMillis(499), TimeValue.timeValueMillis(999), TimeValue.timeValueMillis(100), preTriggerCountStartingFromOneSecond));
|
||||
final AtomicInteger postTriggerCount = new AtomicInteger(0);
|
||||
callbacks.add(postCallbackLatch(TimeValue.timeValueMillis(10), null, TimeValue.timeValueMillis(200), postTriggerCount));
|
||||
final AtomicInteger preTriggerCountStartingFromTwoSeconds = new AtomicInteger(0);
|
||||
callbacks.add(preCallbackLatch(TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(2), TimeValue.timeValueMillis(500), preTriggerCountStartingFromTwoSeconds));
|
||||
|
||||
List<Action> actions = new ArrayList<>();
|
||||
actions.add(registerWithEventNotification(clientService, clientListener, feature, TimeValue.timeValueSeconds(3), callbacks));
|
||||
actions.add(assertExpiryAction(feature, "trial", TimeValue.timeValueMinutes(1)));
|
||||
assertClientListenerNotificationCount(clientListener, actions);
|
||||
assertThat(triggerCount3.get(), greaterThanOrEqualTo(2));
|
||||
assertThat(triggerCount3.get(), lessThan(4));
|
||||
assertThat(triggerCount1.get(), greaterThan(4));
|
||||
assertThat(preTriggerCountStartingFromTwoSeconds.get(), greaterThanOrEqualTo(2));
|
||||
assertThat(preTriggerCountStartingFromTwoSeconds.get(), lessThan(4));
|
||||
assertThat(preTriggerCountStartingFromOneSecond.get(), greaterThan(4));
|
||||
int initialPreTriggerStartingFromTwoSeconds = preTriggerCountStartingFromTwoSeconds.get();
|
||||
int initialPreTriggerCountStartingFromOneSecond = preTriggerCountStartingFromOneSecond.get();
|
||||
assertThat(awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
public boolean apply(Object o) {
|
||||
if (triggerCount2.get() > 0) {
|
||||
if (postTriggerCount.get() > 2) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}, 2, TimeUnit.SECONDS), equalTo(true));
|
||||
int previousTriggerCount = triggerCount2.get();
|
||||
int previousTriggerCount = postTriggerCount.get();
|
||||
|
||||
// Update license
|
||||
generateAndPutSignedLicenseAction(licensesManagerService, feature, TimeValue.timeValueSeconds(10)).run();
|
||||
Thread.sleep(500);
|
||||
assertThat(previousTriggerCount, lessThanOrEqualTo(triggerCount2.get() + 1));
|
||||
assertThat(previousTriggerCount, lessThanOrEqualTo(postTriggerCount.get() + 1));
|
||||
assertThat(initialPreTriggerCountStartingFromOneSecond, equalTo(preTriggerCountStartingFromOneSecond.get()));
|
||||
assertThat(initialPreTriggerStartingFromTwoSeconds, equalTo(preTriggerCountStartingFromTwoSeconds.get()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -352,7 +355,7 @@ public class LicensesClientServiceTests extends AbstractLicensesServiceTests {
|
|||
for (int i = 0; i < randomIntBetween(5, 10); i++) {
|
||||
String feature = "feature_" + String.valueOf(i);
|
||||
final TestTrackingClientListener clientListener = new TestTrackingClientListener(feature);
|
||||
expiryDuration = TimeValue.timeValueMillis(randomIntBetween(1, 3) * 1000l + expiryDuration.millis());
|
||||
expiryDuration = TimeValue.timeValueMillis(randomIntBetween(2, 4) * 1000l + expiryDuration.millis());
|
||||
List<Action> actions = new ArrayList<>();
|
||||
|
||||
if (randomBoolean()) {
|
||||
|
|
Loading…
Reference in New Issue