- handle Invalid license case at REST layer
 - improve notification mechanism
 - improve notification tests

Original commit: elastic/x-pack-elasticsearch@a6c26e1601
This commit is contained in:
Areek Zillur 2014-10-28 14:33:44 -04:00
parent 68270bb454
commit 8d6e0fc164
9 changed files with 242 additions and 71 deletions

View File

@ -8,28 +8,38 @@ package org.elasticsearch.license.plugin.action.put;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.license.plugin.core.LicensesStatus;
import java.io.IOException;
public class PutLicenseResponse extends AcknowledgedResponse {
private LicensesStatus status;
PutLicenseResponse() {
}
PutLicenseResponse(boolean acknowledged) {
PutLicenseResponse(boolean acknowledged, LicensesStatus status) {
super(acknowledged);
this.status = status;
}
public LicensesStatus status() {
return status;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
status = LicensesStatus.fromId(in.readVInt());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeVInt(status.id());
}
}

View File

@ -18,10 +18,12 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.plugin.core.ElasticsearchLicenseException;
import org.elasticsearch.license.plugin.core.LicensesManagerService;
import org.elasticsearch.license.plugin.core.LicensesService;
import org.elasticsearch.license.plugin.core.LicensesStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.license.plugin.core.LicensesService.LicensesUpdateResponse;
import static org.elasticsearch.license.plugin.core.LicensesService.PutLicenseRequestHolder;
public class TransportPutLicenseAction extends TransportMasterNodeOperationAction<PutLicenseRequest, PutLicenseResponse> {
@ -59,10 +61,10 @@ public class TransportPutLicenseAction extends TransportMasterNodeOperationActio
@Override
protected void masterOperation(final PutLicenseRequest request, ClusterState state, final ActionListener<PutLicenseResponse> listener) throws ElasticsearchException {
final PutLicenseRequestHolder requestHolder = new PutLicenseRequestHolder(request, "put licenses []");
LicensesStatus status = licensesManagerService.registerLicenses(requestHolder, new ActionListener<ClusterStateUpdateResponse>() {
licensesManagerService.registerLicenses(requestHolder, new ActionListener<LicensesUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
listener.onResponse(new PutLicenseResponse(clusterStateUpdateResponse.isAcknowledged()));
public void onResponse(LicensesUpdateResponse licensesUpdateResponse) {
listener.onResponse(new PutLicenseResponse(licensesUpdateResponse.isAcknowledged(), licensesUpdateResponse.status()));
}
@Override
@ -70,15 +72,6 @@ public class TransportPutLicenseAction extends TransportMasterNodeOperationActio
listener.onFailure(e);
}
});
switch (status) {
case VALID:
return;
case INVALID:
throw new ElasticsearchLicenseException("Found Invalid License(s)");
case EXPIRED:
throw new ElasticsearchLicenseException("Found Expired License(s)");
}
}
}

View File

@ -14,6 +14,7 @@ import java.util.List;
import java.util.Set;
import static org.elasticsearch.license.plugin.core.LicensesService.DeleteLicenseRequestHolder;
import static org.elasticsearch.license.plugin.core.LicensesService.LicensesUpdateResponse;
import static org.elasticsearch.license.plugin.core.LicensesService.PutLicenseRequestHolder;
@ImplementedBy(LicensesService.class)
@ -21,7 +22,7 @@ public interface LicensesManagerService {
//TODO: documentation
public LicensesStatus registerLicenses(final PutLicenseRequestHolder requestHolder, final ActionListener<ClusterStateUpdateResponse> listener);
public void registerLicenses(final PutLicenseRequestHolder requestHolder, final ActionListener<LicensesUpdateResponse> listener);
public void unregisterLicenses(final DeleteLicenseRequestHolder requestHolder, final ActionListener<ClusterStateUpdateResponse> listener);

View File

@ -70,8 +70,6 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
private Queue<ListenerHolder> pendingRegistrations = new ConcurrentLinkedQueue<>();
private final AtomicReference<ScheduledFuture> notificationScheduler;
private final AtomicReference<LicensesMetaData> lastObservedLicensesState;
@Inject
@ -82,7 +80,7 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
this.threadPool = threadPool;
this.transportService = transportService;
this.lastObservedLicensesState = new AtomicReference<>(null);
this.notificationScheduler = new AtomicReference<>(null);
//this.notificationScheduler = new AtomicReference<>(null);
transportService.registerHandler(REGISTER_TRIAL_LICENSE_ACTION_NAME, new RegisterTrialLicenseRequestHandler());
}
@ -95,7 +93,7 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
* @return LicensesStatus indicating if the provided license(s) is VALID (accepted), INVALID (tampered license) or EXPIRED
*/
@Override
public LicensesStatus registerLicenses(final PutLicenseRequestHolder requestHolder, final ActionListener<ClusterStateUpdateResponse> listener) {
public void registerLicenses(final PutLicenseRequestHolder requestHolder, final ActionListener<LicensesUpdateResponse> listener) {
final PutLicenseRequest request = requestHolder.request;
final Set<ESLicense> newLicenses = Sets.newHashSet(request.licenses());
LicensesStatus status = checkLicenses(newLicenses);
@ -104,12 +102,12 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
break;
case INVALID:
case EXPIRED:
return status;
listener.onResponse(new LicensesUpdateResponse(true, status));
}
clusterService.submitStateUpdateTask(requestHolder.source, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {
clusterService.submitStateUpdateTask(requestHolder.source, new AckedClusterStateUpdateTask<LicensesUpdateResponse>(request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
protected LicensesUpdateResponse newResponse(boolean acknowledged) {
return new LicensesUpdateResponse(acknowledged, LicensesStatus.VALID);
}
@Override
@ -124,7 +122,19 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
}
});
return LicensesStatus.VALID;
}
public static class LicensesUpdateResponse extends ClusterStateUpdateResponse {
private LicensesStatus status;
public LicensesUpdateResponse(boolean acknowledged, LicensesStatus status) {
super(acknowledged);
this.status = status;
}
public LicensesStatus status() {
return status;
}
}
@Override
@ -272,10 +282,12 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
@Override
protected void doClose() throws ElasticsearchException {
logger.info("Closing LicensesService");
/*
if (notificationScheduler.get() != null) {
notificationScheduler.get().cancel(true);
notificationScheduler.set(null);
}
*/
clusterService.remove(this);
if (registeredListeners != null) {
@ -293,7 +305,10 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (!event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
LicensesMetaData oldLicensesMetaData = event.previousState().getMetaData().custom(LicensesMetaData.TYPE);
LicensesMetaData currentLicensesMetaData = event.state().getMetaData().custom(LicensesMetaData.TYPE);
logLicenseMetaDataStats("old", oldLicensesMetaData);
logLicenseMetaDataStats("new", currentLicensesMetaData);
// Check pending feature registrations and try to complete registrations
if (!pendingRegistrations.isEmpty()) {
ListenerHolder pendingRegistrationLister;
@ -302,7 +317,11 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
logger.info("trying to register pending listener for " + pendingRegistrationLister.feature + " masterAvailable: " + masterAvailable);
if (!masterAvailable) {
// if the master is not available do not, break out of trying pendingRegistrations
pendingRegistrations.add(pendingRegistrationLister);
break;
} else {
logger.info("successfully registered listener for: " + pendingRegistrationLister.feature);
registeredListeners.add(pendingRegistrationLister);
}
}
}
@ -311,7 +330,6 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
// notifyFeaturesIfNeeded will short-circuit with -1 if the currentLicensesMetaData has been notified on earlier
// Change to debug
logger.info("calling notifyFeaturesAndScheduleNotificationIfNeeded from clusterChanged");
LicensesMetaData currentLicensesMetaData = event.state().getMetaData().custom(LicensesMetaData.TYPE);
notifyFeaturesAndScheduleNotificationIfNeeded(currentLicensesMetaData);
} else {
logger.info("clusterChanged: no action [has STATE_NOT_RECOVERED_BLOCK]");
@ -322,27 +340,16 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
final LicensesMetaData lastNotifiedLicensesMetaData = lastObservedLicensesState.get();
if (lastNotifiedLicensesMetaData != null && lastNotifiedLicensesMetaData.equals(currentLicensesMetaData)) {
logger.info("currentLicensesMetaData has been already notified on");
return;
//return;
}
notifyFeaturesAndScheduleNotification(currentLicensesMetaData);
}
private long notifyFeaturesAndScheduleNotification(LicensesMetaData currentLicensesMetaData) {
private void notifyFeaturesAndScheduleNotification(LicensesMetaData currentLicensesMetaData) {
long nextScheduleFrequency = notifyFeatures(currentLicensesMetaData);
logger.info("Condition to register new notification schedule: null notification: " + (notificationScheduler.get() == null) + " , nextScheduleFreq: " + (nextScheduleFrequency != -1));
if (notificationScheduler.get() == null && nextScheduleFrequency != -1l) {
logger.info("enabling licensing client notifications");
notificationScheduler.set(threadPool.schedule(TimeValue.timeValueMillis(nextScheduleFrequency), executorName(),
new SubmitReschedulingLicensingClientNotificationJob()));
} else {
if (notificationScheduler.get() != null) {
logger.info("disable license client notification");
notificationScheduler.get().cancel(true);
// set it to null so that new notifications can be scheduled on licensesMetaData change (cluster state change) if needed
notificationScheduler.set(null);
}
if (nextScheduleFrequency != -1l) {
scheduleNextNotification(nextScheduleFrequency);
}
return nextScheduleFrequency;
}
private void logLicenseMetaDataStats(String prefix, LicensesMetaData licensesMetaData) {
@ -356,8 +363,13 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
@Override
public void register(String feature, TrialLicenseOptions trialLicenseOptions, Listener listener) {
final ListenerHolder listenerHolder = new ListenerHolder(feature, trialLicenseOptions, listener);
registeredListeners.add(listenerHolder);
registerListener(listenerHolder);
if (registerListener(listenerHolder)) {
logger.info("successfully registered listener for: " + listenerHolder.feature);
registeredListeners.add(listenerHolder);
} else {
logger.info("add listener for: " + listenerHolder.feature + " to pending registration queue");
pendingRegistrations.add(listenerHolder);
}
}
/**
@ -372,6 +384,10 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
*/
private boolean registerListener(final ListenerHolder listenerHolder) {
logger.info("Registering listener for " + listenerHolder.feature);
if (clusterService.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
logger.info("Store as pendingRegistration [cluster has NOT_RECOVERED_BLOCK]");
return false;
}
LicensesMetaData currentMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE);
if (!hasLicenseForFeature(listenerHolder.feature, currentMetaData)) {
@ -393,7 +409,6 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
} else {
// could not sent register trial license request to master
logger.info("Store as pendingRegistration [master not available yet]");
pendingRegistrations.add(listenerHolder);
return false;
}
}
@ -441,17 +456,13 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
}
public class SubmitReschedulingLicensingClientNotificationJob implements Runnable {
@Override
public void run() {
if (logger.isTraceEnabled()) {
logger.trace("Submitting new rescheduling licensing client notification job");
}
try {
threadPool.executor(executorName()).execute(new LicensingClientNotificationJob());
} catch (EsRejectedExecutionException ex) {
logger.info("Couldn't re-schedule licensing client notification job", ex);
}
private void scheduleNextNotification(long nextScheduleDelay) {
try {
final TimeValue delay = TimeValue.timeValueMillis(nextScheduleDelay);
threadPool.schedule(delay, executorName(), new LicensingClientNotificationJob());
logger.info("Scheduling next notification after: " + delay);
} catch (EsRejectedExecutionException ex) {
logger.info("Couldn't re-schedule licensing client notification job", ex);
}
}
@ -466,6 +477,17 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
logger.trace("Performing LicensingClientNotificationJob");
}
LicensesMetaData currentLicensesMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE);
long nextScheduleDelay = notifyFeatures(currentLicensesMetaData);
if (nextScheduleDelay != -1l) {
try {
scheduleNextNotification(nextScheduleDelay);
} catch (EsRejectedExecutionException ex) {
logger.info("Reschedule licensing client notification job was rejected", ex);
}
}
/*
LicensesMetaData currentLicensesMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE);
// Change to debug
@ -482,12 +504,11 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingLicensingClientNotificationJob());
} catch (EsRejectedExecutionException ex) {
logger.info("Reschedule licensing client notification job was rejected", ex);
}
}*/
}
}
private long notifyFeatures(LicensesMetaData currentLicensesMetaData) {
LicensesWrapper licensesWrapper = LicensesWrapper.wrap(currentLicensesMetaData);
long nextScheduleFrequency = -1l;
long offset = TimeValue.timeValueMillis(100).getMillis();
StringBuilder sb = new StringBuilder("Registered listeners: [ ");
@ -539,10 +560,10 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
sb.append("]");
logger.info(sb.toString());
lastObservedLicensesState.set(licensesWrapper.get());
lastObservedLicensesState.set(currentLicensesMetaData);
if (nextScheduleFrequency == -1l) {
logger.info("turn off notifications");
logger.info("no need to schedule next notification");
} else {
logger.info("next notification time: " + TimeValue.timeValueMillis(nextScheduleFrequency).toString());
}
@ -732,10 +753,10 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
//Should not be exposed; used by testing only
public void clear() {
if (notificationScheduler.get() != null) {
/*if (notificationScheduler.get() != null) {
notificationScheduler.get().cancel(true);
notificationScheduler.set(null);
}
}*/
registeredListeners.clear();
}
}

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.license.plugin.core;
import org.elasticsearch.ElasticsearchIllegalStateException;
public enum LicensesStatus {
VALID((byte) 0),
INVALID((byte) 1),
@ -14,4 +16,20 @@ public enum LicensesStatus {
LicensesStatus(byte id) {
this.id = id;
}
public int id() {
return id;
}
public static LicensesStatus fromId(int id) {
if (id == 0) {
return VALID;
} else if (id == 1) {
return INVALID;
} else if (id == 2) {
return EXPIRED;
} else {
throw new ElasticsearchIllegalStateException("no valid LicensesStatus for id=" + id);
}
}
}

View File

@ -5,18 +5,25 @@
*/
package org.elasticsearch.license.plugin.rest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.license.plugin.action.put.PutLicenseAction;
import org.elasticsearch.license.plugin.action.put.PutLicenseRequest;
import org.elasticsearch.license.plugin.action.put.PutLicenseResponse;
import org.elasticsearch.license.plugin.core.LicensesStatus;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.AcknowledgedRestListener;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
@ -35,6 +42,32 @@ public class RestPutLicenseAction extends BaseRestHandler {
PutLicenseRequest putLicenseRequest = new PutLicenseRequest();
putLicenseRequest.listenerThreaded(false);
putLicenseRequest.licenses(request.content().toUtf8());
client.admin().cluster().execute(PutLicenseAction.INSTANCE, putLicenseRequest, new AcknowledgedRestListener<PutLicenseResponse>(channel));
client.admin().cluster().execute(PutLicenseAction.INSTANCE, putLicenseRequest, new LicensesAcknowledgedListener(channel));
}
private class LicensesAcknowledgedListener extends AcknowledgedRestListener<PutLicenseResponse> {
public LicensesAcknowledgedListener(RestChannel channel) {
super(channel);
}
@Override
protected void addCustomFields(XContentBuilder builder, PutLicenseResponse response) throws IOException {
LicensesStatus status = response.status();
String statusString = null;
switch (status) {
case VALID:
statusString = "valid";
break;
case INVALID:
statusString = "invalid";
break;
case EXPIRED:
statusString = "expired";
break;
}
builder.field("licenses_status", statusString);
}
}
}

View File

@ -25,6 +25,14 @@ public class AbstractLicensingTestBase {
}
public static String getTestPriKeyPath() throws Exception {
return getResourcePath("/private.key");
}
public static String getTestPubKeyPath() throws Exception {
return getResourcePath("/public.key");
}
private static String getResourcePath(String resource) throws Exception {
URL url = ESLicenseManager.class.getResource(resource);
return url.toURI().getPath();

View File

@ -5,11 +5,22 @@
*/
package org.elasticsearch.license.plugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.common.base.Predicate;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.AbstractLicensingTestBase;
import org.elasticsearch.license.core.ESLicense;
import org.elasticsearch.license.licensor.ESLicenseSigner;
import org.elasticsearch.license.plugin.action.put.PutLicenseRequest;
import org.elasticsearch.license.plugin.action.put.PutLicenseRequestBuilder;
import org.elasticsearch.license.plugin.action.put.PutLicenseResponse;
import org.elasticsearch.license.plugin.core.LicensesManagerService;
import org.elasticsearch.license.plugin.core.LicensesService;
import org.elasticsearch.license.plugin.core.LicensesStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Before;
@ -18,14 +29,18 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.license.AbstractLicensingTestBase.getTestPriKeyPath;
import static org.elasticsearch.license.AbstractLicensingTestBase.getTestPubKeyPath;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.hamcrest.CoreMatchers.equalTo;
@ClusterScope(scope = TEST, numDataNodes = 10, numClientNodes = 0)
@ClusterScope(scope = TEST, numDataNodes = 3, numClientNodes = 0)
public class LicensesPluginIntegrationTests extends ElasticsearchIntegrationTest {
private final int trialLicenseDurationInSeconds = 5;
@ -46,7 +61,7 @@ public class LicensesPluginIntegrationTests extends ElasticsearchIntegrationTest
}
@Test
public void test() throws InterruptedException {
public void test() throws Exception {
// managerService should report feature to be enabled on all data nodes
assertThat(awaitBusy(new Predicate<Object>() {
@Override
@ -62,7 +77,7 @@ public class LicensesPluginIntegrationTests extends ElasticsearchIntegrationTest
// consumer plugin service should return enabled on all data nodes
assertThat(awaitBusy(new Predicate<Object>() {
/*assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
for (TestPluginService pluginService : consumerPluginServices()) {
@ -72,7 +87,8 @@ public class LicensesPluginIntegrationTests extends ElasticsearchIntegrationTest
}
return true;
}
}, 2, TimeUnit.SECONDS), equalTo(true));
}, 2, TimeUnit.SECONDS), equalTo(true));*/
assertConsumerPluginEnableNotification(2);
// consumer plugin should notify onDisabled on all data nodes (expired trial license)
@ -89,6 +105,76 @@ public class LicensesPluginIntegrationTests extends ElasticsearchIntegrationTest
}
}, trialLicenseDurationInSeconds * 2, TimeUnit.SECONDS), equalTo(true));
// consumer plugin should notify onEnabled on all data nodes (signed license)
/*
ESLicense license = generateSignedLicense(TestPluginService.FEATURE_NAME, TimeValue.timeValueSeconds(5));
final PutLicenseResponse putLicenseResponse = new PutLicenseRequestBuilder(client().admin().cluster()).setLicense(Lists.newArrayList(license)).get();
assertThat(putLicenseResponse.isAcknowledged(), equalTo(true));
assertThat(putLicenseResponse.status(), equalTo(LicensesStatus.VALID));
logger.info(" --> put signed license");
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
for (TestPluginService pluginService : consumerPluginServices()) {
if (!pluginService.enabled()) {
return false;
}
}
return true;
}
}, 2, TimeUnit.SECONDS), equalTo(true));
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
for (TestPluginService pluginService : consumerPluginServices()) {
if (pluginService.enabled()) {
return false;
}
}
return true;
}
}, 5, TimeUnit.SECONDS), equalTo(true));
*/
}
private void assertConsumerPluginDisableNotification(int timeoutInSec) throws InterruptedException {
assertConsumerPluginNotification(false, timeoutInSec);
}
private void assertConsumerPluginEnableNotification(int timeoutInSec) throws InterruptedException {
assertConsumerPluginNotification(true, timeoutInSec);
}
private void assertConsumerPluginNotification(final boolean expectedEnabled, int timeoutInSec) throws InterruptedException {
assertThat(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
for (TestPluginService pluginService : consumerPluginServices()) {
if (expectedEnabled != pluginService.enabled()) {
return false;
}
}
return true;
}
}, timeoutInSec, TimeUnit.SECONDS), equalTo(true));
}
private ESLicense generateSignedLicense(String feature, TimeValue expiryDate) throws Exception {
final ESLicense licenseSpec = ESLicense.builder()
.uid(UUID.randomUUID().toString())
.feature(feature)
.expiryDate(expiryDate.getMillis())
.issueDate(System.currentTimeMillis())
.type("subscription")
.subscriptionType("gold")
.issuedTo("customer")
.issuer("elasticsearch")
.maxNodes(10)
.build();
ESLicenseSigner signer = new ESLicenseSigner(getTestPriKeyPath(), getTestPubKeyPath());
return signer.sign(licenseSpec);
}
private Iterable<TestPluginService> consumerPluginServices() {

View File

@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.license.plugin.core.LicensesService.LicensesUpdateResponse;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.TEST;
import static org.hamcrest.Matchers.equalTo;
@ -150,9 +151,9 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest {
ESLicenseManager esLicenseManager = masterLicenseManager();
final CountDownLatch latch1 = new CountDownLatch(1);
// todo: fix with awaitBusy
licensesManagerService.registerLicenses(new LicensesService.PutLicenseRequestHolder(new PutLicenseRequest().licenses(licenses), "test"), new ActionListener<ClusterStateUpdateResponse>() {
licensesManagerService.registerLicenses(new LicensesService.PutLicenseRequestHolder(new PutLicenseRequest().licenses(licenses), "test"), new ActionListener<LicensesUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
public void onResponse(LicensesUpdateResponse clusterStateUpdateResponse) {
if (clusterStateUpdateResponse.isAcknowledged()) {
latch1.countDown();
}
@ -178,9 +179,9 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest {
List<ESLicense> licenses2 = ESLicenses.fromSource(licenseOutput);
final CountDownLatch latch2 = new CountDownLatch(1);
// todo: fix with awaitBusy
licensesManagerService.registerLicenses(new LicensesService.PutLicenseRequestHolder(new PutLicenseRequest().licenses(licenses2), "test"), new ActionListener<ClusterStateUpdateResponse>() {
licensesManagerService.registerLicenses(new LicensesService.PutLicenseRequestHolder(new PutLicenseRequest().licenses(licenses2), "test"), new ActionListener<LicensesUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
public void onResponse(LicensesUpdateResponse clusterStateUpdateResponse) {
if (clusterStateUpdateResponse.isAcknowledged()) {
latch2.countDown();
}
@ -289,9 +290,9 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest {
final CountDownLatch latch1 = new CountDownLatch(1);
// todo: fix with awaitBusy
masterLicensesManagerService.registerLicenses(new LicensesService.PutLicenseRequestHolder(new PutLicenseRequest().licenses(licenses), "test"), new ActionListener<ClusterStateUpdateResponse>() {
masterLicensesManagerService.registerLicenses(new LicensesService.PutLicenseRequestHolder(new PutLicenseRequest().licenses(licenses), "test"), new ActionListener<LicensesUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
public void onResponse(LicensesUpdateResponse clusterStateUpdateResponse) {
if (clusterStateUpdateResponse.isAcknowledged()) {
latch1.countDown();
}