LicensesService Notification work

Original commit: elastic/x-pack-elasticsearch@cd7260dec9
This commit is contained in:
Areek Zillur 2014-10-22 23:24:00 -04:00
parent 996f7aadf7
commit 59b101cfab
4 changed files with 173 additions and 64 deletions

View File

@ -55,6 +55,34 @@ public class LicensesMetaData implements MetaData.Custom {
return encodedTrialLicenses;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof LicensesMetaData) {
LicensesMetaData other = (LicensesMetaData) obj;
boolean signaturesEqual = false;
boolean trialLicensesEqual = false;
if (other.getSignatures() != null) {
if (this.getSignatures() != null) {
signaturesEqual = other.getSignatures().equals(this.getSignatures());
} else {
return false;
}
}
if (other.getEncodedTrialLicenses() != null) {
if (this.getEncodedTrialLicenses() != null) {
trialLicensesEqual = other.getEncodedTrialLicenses().equals(this.getEncodedTrialLicenses());
} else {
return false;
}
}
return signaturesEqual && trialLicensesEqual;
}
return false;
}
/**
* Licenses metadata factory
*/

View File

@ -20,6 +20,8 @@ import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@ -32,12 +34,16 @@ import org.elasticsearch.license.plugin.core.trial.TrialLicenseUtils;
import org.elasticsearch.license.plugin.core.trial.TrialLicenses;
import org.elasticsearch.license.plugin.core.trial.TrialLicensesBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.license.manager.Utils.reduceAndMap;
import static org.elasticsearch.license.plugin.core.trial.TrialLicenses.TrialLicense;
/**
@ -47,26 +53,33 @@ import static org.elasticsearch.license.plugin.core.trial.TrialLicenses.TrialLic
* - LicensesClientService - allow interested plugins (features) to register to licensing notifications
*
* TODO: documentation
* TODO: figure out when to check GatewayService.STATE_NOT_RECOVERED_BLOCK
*/
@Singleton
public class LicensesService extends AbstractLifecycleComponent<LicensesService> implements ClusterStateListener, LicensesManagerService, LicensesClientService {
private ESLicenseManager esLicenseManager;
public static final String REGISTER_TRIAL_LICENSE_ACTION_NAME = "internal:cluster/licenses/register_trial_license";
private ClusterService clusterService;
private final ESLicenseManager esLicenseManager;
private ThreadPool threadPool;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final TransportService transportService;
private List<ListenerHolder> registeredListeners = new CopyOnWriteArrayList<>();
private volatile ScheduledFuture notificationScheduler;
@Inject
public LicensesService(Settings settings, ClusterService clusterService, ThreadPool threadPool) {
public LicensesService(Settings settings, ClusterService clusterService, ThreadPool threadPool, TransportService transportService) {
super(settings);
this.clusterService = clusterService;
this.esLicenseManager = new ESLicenseManager();
this.threadPool = threadPool;
this.transportService = transportService;
transportService.registerHandler(REGISTER_TRIAL_LICENSE_ACTION_NAME, new RegisterTrialLicenseRequestHandler());
}
/**
@ -134,7 +147,7 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
@Override
public LicensesStatus checkLicenses(Set<ESLicense> licenses) {
final ImmutableMap<String, ESLicense> map = org.elasticsearch.license.manager.Utils.reduceAndMap(licenses);
final ImmutableMap<String, ESLicense> map = reduceAndMap(licenses);
return checkLicenses(map);
}
@ -191,21 +204,27 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
}
private void registerTrialLicense(final TrialLicense trialLicense) {
private void registerTrialLicense(final RegisterTrialLicenseRequest request) {
clusterService.submitStateUpdateTask("register trial license []", new ProcessedClusterStateUpdateTask() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// Change to debug
logger.info("Processed Trial License registration");
LicensesMetaData licensesMetaData = newState.metaData().custom(LicensesMetaData.TYPE);
if (licensesMetaData != null) {
logger.info("New state: signedLicenses: " + licensesMetaData.getSignatures().size() + " trialLicenses: " + licensesMetaData.getEncodedTrialLicenses().size());
}
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
MetaData metaData = currentState.metaData();
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
LicensesMetaData currentLicenses = metaData.custom(LicensesMetaData.TYPE);
final LicensesWrapper licensesWrapper = LicensesWrapper.wrap(currentLicenses);
LicensesMetaData currentLicensesMetaData = metaData.custom(LicensesMetaData.TYPE);
final LicensesWrapper licensesWrapper = LicensesWrapper.wrap(currentLicensesMetaData);
// do not generate a trial license for a feature that already has a signed license
if (!esLicenseManager.hasLicenseForFeature(trialLicense.feature(), getEffectiveLicenses())) {
licensesWrapper.addTrialLicense(trialLicense);
if (!esLicenseManager.hasLicenseForFeature(request.feature, getEffectiveLicenses())) {
licensesWrapper.addTrialLicense(generateTrialLicense(request.feature, request.duration, request.maxNodes));
}
mdBuilder.putCustom(LicensesMetaData.TYPE, licensesWrapper.createLicensesMetaData());
return ClusterState.builder(currentState).metaData(mdBuilder).build();
@ -215,7 +234,19 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
public void onFailure(String source, @Nullable Throwable t) {
logger.info("LicensesService: " + source, t);
}
private TrialLicense generateTrialLicense(String feature, TimeValue duration, int maxNodes) {
return TrialLicensesBuilder.trialLicenseBuilder()
.issuedTo(clusterService.state().getClusterName().value())
.issueDate(System.currentTimeMillis())
.duration(duration)
.feature(feature)
.maxNodes(maxNodes)
.build();
}
});
}
@Override
@ -251,7 +282,25 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (!event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
performMasterNodeOperations(event);
// notify all interested plugins
if (checkIfUpdatedMetaData(event)) {
final LicensesMetaData currentLicensesMetaData = event.state().getMetaData().custom(LicensesMetaData.TYPE);
// Change to debug
if (currentLicensesMetaData != null) {
logger.info("LicensesMetaData: signedLicenses: " + currentLicensesMetaData.getSignatures().size() + " trialLicenses: " + currentLicensesMetaData.getEncodedTrialLicenses().size());
} else {
logger.info("LicensesMetaData: signedLicenses: 0 trialLicenses: 0");
}
// Change to debug
logger.info("calling notifyFeatures from clusterChanged");
long nextScheduleFrequency = notifyFeatures(currentLicensesMetaData);
if (notificationScheduler == null) {
notificationScheduler = threadPool.schedule(TimeValue.timeValueMillis(nextScheduleFrequency), executorName(),
new SubmitReschedulingLicensingClientNotificationJob());
}
}
}
}
@ -267,10 +316,8 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
registeredListeners.add(new ListenerHolder(feature, trialLicenseOptions, listener));
// DO we need to check STATE_NOT_RECOVERED_BLOCK here
if (clusterService.state().nodes().localNodeMaster()) {
LicensesMetaData currentMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE);
registerListeners(currentMetaData);
}
LicensesMetaData currentMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE);
registerListeners(currentMetaData);
}
private void registerListeners(LicensesMetaData currentMetaData) {
@ -281,10 +328,18 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
TrialLicenseOptions options = listenerHolder.trialLicenseOptions;
if (options != null) {
// Trial license option is provided
TrialLicense trialLicense = generateTrialLicense(listenerHolder.feature, options.durationInDays, options.maxNodes);
registerTrialLicense(trialLicense);
RegisterTrialLicenseRequest request = new RegisterTrialLicenseRequest(listenerHolder.feature,
new TimeValue(options.durationInDays, TimeUnit.DAYS), options.maxNodes);
if (clusterService.state().nodes().localNodeMaster()) {
registerTrialLicense(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
REGISTER_TRIAL_LICENSE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
} else {
// notify feature as clusterChangedEvent may not happen
// Change to debug
logger.info("calling notifyFeatures from registerListeners");
notifyFeatures(currentMetaData);
}
}
@ -292,24 +347,6 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
}
}
private void performMasterNodeOperations(ClusterChangedEvent event) {
if (event.state().nodes().localNodeMaster()) {
final LicensesMetaData currentLicensesMetaData = event.state().getMetaData().custom(LicensesMetaData.TYPE);
// register all interested plugins
registerListeners(currentLicensesMetaData);
// notify all interested plugins
if (currentLicensesMetaData != null && checkIfUpdatedMetaData(event)) {
long nextScheduleFrequency = notifyFeatures(currentLicensesMetaData);
if (notificationScheduler == null) {
notificationScheduler = threadPool.schedule(TimeValue.timeValueMillis(nextScheduleFrequency), executorName(),
new SubmitReschedulingLicensingClientNotificationJob());
}
}
}
}
private String executorName() {
return ThreadPool.Names.GENERIC;
}
@ -324,7 +361,7 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
for (String signature : metaData.getSignatures()) {
esLicenses.add(esLicenseManager.fromSignature(signature));
}
return org.elasticsearch.license.manager.Utils.reduceAndMap(esLicenses);
return reduceAndMap(esLicenses);
}
return ImmutableMap.copyOf(map);
@ -368,11 +405,9 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
if (clusterService.state().nodes().localNodeMaster()) {
LicensesMetaData currentLicensesMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE);
long nextScheduleFrequency = TimeValue.timeValueMinutes(5).getMillis();
if (currentLicensesMetaData != null) {
nextScheduleFrequency = Math.max(nextScheduleFrequency, notifyFeatures(currentLicensesMetaData));
}
// Change to debug
logger.info("calling notifyFeatures from LicensingClientNotificationJob");
long nextScheduleFrequency = Math.max(TimeValue.timeValueMinutes(5).getMillis(), notifyFeatures(currentLicensesMetaData));
TimeValue updateFrequency = TimeValue.timeValueMillis(nextScheduleFrequency);
if (this.reschedule) {
@ -406,6 +441,8 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
}
long expiryDuration = expiryDate - System.currentTimeMillis();
if (expiryDuration > 0l) {
// Change to debug
logger.info("calling enabledFeatureIfNeeded on " + listenerHolder.feature + " with trialLicense size=" + licensesWrapper.encodedTrialLicenses.size());
listenerHolder.enableFeatureIfNeeded();
if (nextScheduleFrequency == -1l) {
nextScheduleFrequency = expiryDuration + offset;
@ -413,6 +450,8 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
nextScheduleFrequency = Math.min(expiryDuration + offset, nextScheduleFrequency);
}
} else {
// Change to debug
logger.info("calling disabledFeatureIfNeeded on " + listenerHolder.feature + " with trialLicense size=" + licensesWrapper.encodedTrialLicenses.size());
listenerHolder.disableFeatureIfNeeded();
}
}
@ -444,15 +483,6 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
}
}
private TrialLicense generateTrialLicense(String feature, int durationInDays, int maxNodes) {
return TrialLicensesBuilder.trialLicenseBuilder()
.issuedTo(clusterService.state().getClusterName().value())
.issueDate(System.currentTimeMillis())
.durationInDays(durationInDays)
.feature(feature)
.maxNodes(maxNodes)
.build();
}
public static class TrialLicenseOptions {
@ -539,13 +569,13 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
public void addSignedLicenses(ESLicenseManager licenseManager, Set<ESLicense> newLicenses) {
Set<ESLicense> currentSignedLicenses = signedLicenses(licenseManager);
final ImmutableMap<String, ESLicense> licenseMap = org.elasticsearch.license.manager.Utils.reduceAndMap(Sets.union(currentSignedLicenses, newLicenses));
final ImmutableMap<String, ESLicense> licenseMap = reduceAndMap(Sets.union(currentSignedLicenses, newLicenses));
this.signatures = licenseManager.toSignatures(licenseMap.values());
}
public void removeFeatures(ESLicenseManager licenseManager, Set<String> featuresToDelete) {
Set<ESLicense> currentSignedLicenses = signedLicenses(licenseManager);
final ImmutableMap<String, ESLicense> licenseMap = org.elasticsearch.license.manager.Utils.reduceAndMap(currentSignedLicenses);
final ImmutableMap<String, ESLicense> licenseMap = reduceAndMap(currentSignedLicenses);
Set<ESLicense> licensesToDelete = new HashSet<>();
for (Map.Entry<String, ESLicense> entry : licenseMap.entrySet()) {
if (featuresToDelete.contains(entry.getKey())) {
@ -561,6 +591,58 @@ public class LicensesService extends AbstractLifecycleComponent<LicensesService>
}
}
private static class RegisterTrialLicenseRequest extends TransportRequest {
private int maxNodes;
private String feature;
private TimeValue duration;
private RegisterTrialLicenseRequest() {
}
private RegisterTrialLicenseRequest(String feature, TimeValue duration, int maxNodes) {
this.maxNodes = maxNodes;
this.feature = feature;
this.duration = duration;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
maxNodes = in.readVInt();
feature = in.readString();
duration = new TimeValue(in.readVLong(), TimeUnit.MILLISECONDS);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(maxNodes);
out.writeString(feature);
out.writeVLong(duration.getMillis());
}
}
private class RegisterTrialLicenseRequestHandler extends BaseTransportRequestHandler<RegisterTrialLicenseRequest> {
@Override
public RegisterTrialLicenseRequest newInstance() {
return new RegisterTrialLicenseRequest();
}
@Override
public void messageReceived(RegisterTrialLicenseRequest request, TransportChannel channel) throws Exception {
registerTrialLicense(request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
//Should not be exposed; used by testing only
public void clear() {
if (notificationScheduler != null) {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.license.plugin.core.trial;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.core.DateUtils;
import org.elasticsearch.license.core.ESLicense;
@ -96,7 +97,7 @@ public class TrialLicensesBuilder {
private String featureType;
private long expiryDate = -1;
private long issueDate = -1;
private int durationInDays = -1;
private TimeValue duration;
private int maxNodes = -1;
private String uid = null;
private String issuedTo;
@ -129,8 +130,8 @@ public class TrialLicensesBuilder {
return this;
}
public TrialLicenseBuilder durationInDays(int days) {
this.durationInDays = days;
public TrialLicenseBuilder duration(TimeValue duration) {
this.duration = duration;
return this;
}
@ -142,8 +143,7 @@ public class TrialLicensesBuilder {
public TrialLicense build() {
verify();
if (expiryDate == -1) {
assert durationInDays != -1;
expiryDate = DateUtils.expiryDateAfterDays(issueDate, durationInDays);
expiryDate = issueDate + duration.millis();
}
if (uid == null) {
uid = UUID.randomUUID().toString();
@ -190,8 +190,8 @@ public class TrialLicensesBuilder {
msg = "feature has to be set";
} else if (issueDate == -1) {
msg = "issueDate has to be set";
} else if (durationInDays == -1 && expiryDate == -1) {
msg = "durationInDays or expiryDate has to be set";
} else if (duration == null && expiryDate == -1) {
msg = "duration or expiryDate has to be set";
} else if (maxNodes == -1) {
msg = "maxNodes has to be set";
}

View File

@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
@ClusterScope(scope = SUITE, numDataNodes = 10)
@ClusterScope(scope = SUITE, numDataNodes = 3)
public class LicensesServiceTests extends ElasticsearchIntegrationTest {
@ -89,9 +89,6 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest {
});
latch.await();
clear();
masterClusterService().remove(licensesService());
masterClusterService().add(licensesService());
}
@Test
@ -193,6 +190,7 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest {
clientService.register("shield", new LicensesService.TrialLicenseOptions(10, 100), new LicensesClientService.Listener() {
@Override
public void onEnabled() {
logger.info("got onEnabled from LicensesClientService");
latch.countDown();
}
@ -201,6 +199,7 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest {
fail();
}
});
logger.info("waiting for onEnabled");
latch.await();
final LicensesMetaData metaData = clusterService().state().metaData().custom(LicensesMetaData.TYPE);
assertTrue(metaData.getEncodedTrialLicenses().size() == 1);
@ -330,7 +329,7 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest {
private LicensesClientService licensesClientService() {
final InternalTestCluster clients = internalCluster();
return clients.getInstance(LicensesClientService.class, clients.getMasterName());
return clients.getInstance(LicensesClientService.class);
}
private LicensesService licensesService() {