diff --git a/src/main/java/org/elasticsearch/license/plugin/core/LicensesMetaData.java b/src/main/java/org/elasticsearch/license/plugin/core/LicensesMetaData.java index 2cc6578e945..29c053fbab8 100644 --- a/src/main/java/org/elasticsearch/license/plugin/core/LicensesMetaData.java +++ b/src/main/java/org/elasticsearch/license/plugin/core/LicensesMetaData.java @@ -55,6 +55,34 @@ public class LicensesMetaData implements MetaData.Custom { return encodedTrialLicenses; } + @Override + public boolean equals(Object obj) { + if (obj instanceof LicensesMetaData) { + LicensesMetaData other = (LicensesMetaData) obj; + boolean signaturesEqual = false; + boolean trialLicensesEqual = false; + + if (other.getSignatures() != null) { + if (this.getSignatures() != null) { + signaturesEqual = other.getSignatures().equals(this.getSignatures()); + } else { + return false; + } + } + + if (other.getEncodedTrialLicenses() != null) { + if (this.getEncodedTrialLicenses() != null) { + trialLicensesEqual = other.getEncodedTrialLicenses().equals(this.getEncodedTrialLicenses()); + } else { + return false; + } + } + + return signaturesEqual && trialLicensesEqual; + } + return false; + } + /** * Licenses metadata factory */ diff --git a/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java b/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java index 8280976eb34..fb17db0cb9c 100644 --- a/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java +++ b/src/main/java/org/elasticsearch/license/plugin/core/LicensesService.java @@ -20,6 +20,8 @@ import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Singleton; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -32,12 +34,16 @@ import org.elasticsearch.license.plugin.core.trial.TrialLicenseUtils; import org.elasticsearch.license.plugin.core.trial.TrialLicenses; import org.elasticsearch.license.plugin.core.trial.TrialLicensesBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; +import java.io.IOException; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.elasticsearch.license.manager.Utils.reduceAndMap; import static org.elasticsearch.license.plugin.core.trial.TrialLicenses.TrialLicense; /** @@ -47,26 +53,33 @@ import static org.elasticsearch.license.plugin.core.trial.TrialLicenses.TrialLic * - LicensesClientService - allow interested plugins (features) to register to licensing notifications * * TODO: documentation + * TODO: figure out when to check GatewayService.STATE_NOT_RECOVERED_BLOCK */ @Singleton public class LicensesService extends AbstractLifecycleComponent implements ClusterStateListener, LicensesManagerService, LicensesClientService { - private ESLicenseManager esLicenseManager; + public static final String REGISTER_TRIAL_LICENSE_ACTION_NAME = "internal:cluster/licenses/register_trial_license"; - private ClusterService clusterService; + private final ESLicenseManager esLicenseManager; - private ThreadPool threadPool; + private final ClusterService clusterService; + + private final ThreadPool threadPool; + + private final TransportService transportService; private List registeredListeners = new CopyOnWriteArrayList<>(); private volatile ScheduledFuture notificationScheduler; @Inject - public LicensesService(Settings settings, ClusterService clusterService, ThreadPool threadPool) { + public LicensesService(Settings settings, ClusterService clusterService, ThreadPool threadPool, TransportService transportService) { super(settings); this.clusterService = clusterService; this.esLicenseManager = new ESLicenseManager(); this.threadPool = threadPool; + this.transportService = transportService; + transportService.registerHandler(REGISTER_TRIAL_LICENSE_ACTION_NAME, new RegisterTrialLicenseRequestHandler()); } /** @@ -134,7 +147,7 @@ public class LicensesService extends AbstractLifecycleComponent @Override public LicensesStatus checkLicenses(Set licenses) { - final ImmutableMap map = org.elasticsearch.license.manager.Utils.reduceAndMap(licenses); + final ImmutableMap map = reduceAndMap(licenses); return checkLicenses(map); } @@ -191,21 +204,27 @@ public class LicensesService extends AbstractLifecycleComponent } - private void registerTrialLicense(final TrialLicense trialLicense) { + private void registerTrialLicense(final RegisterTrialLicenseRequest request) { clusterService.submitStateUpdateTask("register trial license []", new ProcessedClusterStateUpdateTask() { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + // Change to debug + logger.info("Processed Trial License registration"); + LicensesMetaData licensesMetaData = newState.metaData().custom(LicensesMetaData.TYPE); + if (licensesMetaData != null) { + logger.info("New state: signedLicenses: " + licensesMetaData.getSignatures().size() + " trialLicenses: " + licensesMetaData.getEncodedTrialLicenses().size()); + } } @Override public ClusterState execute(ClusterState currentState) throws Exception { MetaData metaData = currentState.metaData(); MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); - LicensesMetaData currentLicenses = metaData.custom(LicensesMetaData.TYPE); - final LicensesWrapper licensesWrapper = LicensesWrapper.wrap(currentLicenses); + LicensesMetaData currentLicensesMetaData = metaData.custom(LicensesMetaData.TYPE); + final LicensesWrapper licensesWrapper = LicensesWrapper.wrap(currentLicensesMetaData); // do not generate a trial license for a feature that already has a signed license - if (!esLicenseManager.hasLicenseForFeature(trialLicense.feature(), getEffectiveLicenses())) { - licensesWrapper.addTrialLicense(trialLicense); + if (!esLicenseManager.hasLicenseForFeature(request.feature, getEffectiveLicenses())) { + licensesWrapper.addTrialLicense(generateTrialLicense(request.feature, request.duration, request.maxNodes)); } mdBuilder.putCustom(LicensesMetaData.TYPE, licensesWrapper.createLicensesMetaData()); return ClusterState.builder(currentState).metaData(mdBuilder).build(); @@ -215,7 +234,19 @@ public class LicensesService extends AbstractLifecycleComponent public void onFailure(String source, @Nullable Throwable t) { logger.info("LicensesService: " + source, t); } + + private TrialLicense generateTrialLicense(String feature, TimeValue duration, int maxNodes) { + return TrialLicensesBuilder.trialLicenseBuilder() + .issuedTo(clusterService.state().getClusterName().value()) + .issueDate(System.currentTimeMillis()) + .duration(duration) + .feature(feature) + .maxNodes(maxNodes) + .build(); + } }); + + } @Override @@ -251,7 +282,25 @@ public class LicensesService extends AbstractLifecycleComponent @Override public void clusterChanged(ClusterChangedEvent event) { if (!event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - performMasterNodeOperations(event); + + // notify all interested plugins + if (checkIfUpdatedMetaData(event)) { + final LicensesMetaData currentLicensesMetaData = event.state().getMetaData().custom(LicensesMetaData.TYPE); + // Change to debug + if (currentLicensesMetaData != null) { + logger.info("LicensesMetaData: signedLicenses: " + currentLicensesMetaData.getSignatures().size() + " trialLicenses: " + currentLicensesMetaData.getEncodedTrialLicenses().size()); + } else { + logger.info("LicensesMetaData: signedLicenses: 0 trialLicenses: 0"); + } + + // Change to debug + logger.info("calling notifyFeatures from clusterChanged"); + long nextScheduleFrequency = notifyFeatures(currentLicensesMetaData); + if (notificationScheduler == null) { + notificationScheduler = threadPool.schedule(TimeValue.timeValueMillis(nextScheduleFrequency), executorName(), + new SubmitReschedulingLicensingClientNotificationJob()); + } + } } } @@ -267,10 +316,8 @@ public class LicensesService extends AbstractLifecycleComponent registeredListeners.add(new ListenerHolder(feature, trialLicenseOptions, listener)); // DO we need to check STATE_NOT_RECOVERED_BLOCK here - if (clusterService.state().nodes().localNodeMaster()) { - LicensesMetaData currentMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE); - registerListeners(currentMetaData); - } + LicensesMetaData currentMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE); + registerListeners(currentMetaData); } private void registerListeners(LicensesMetaData currentMetaData) { @@ -281,10 +328,18 @@ public class LicensesService extends AbstractLifecycleComponent TrialLicenseOptions options = listenerHolder.trialLicenseOptions; if (options != null) { // Trial license option is provided - TrialLicense trialLicense = generateTrialLicense(listenerHolder.feature, options.durationInDays, options.maxNodes); - registerTrialLicense(trialLicense); + RegisterTrialLicenseRequest request = new RegisterTrialLicenseRequest(listenerHolder.feature, + new TimeValue(options.durationInDays, TimeUnit.DAYS), options.maxNodes); + if (clusterService.state().nodes().localNodeMaster()) { + registerTrialLicense(request); + } else { + transportService.sendRequest(clusterService.state().nodes().masterNode(), + REGISTER_TRIAL_LICENSE_ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME); + } } else { // notify feature as clusterChangedEvent may not happen + // Change to debug + logger.info("calling notifyFeatures from registerListeners"); notifyFeatures(currentMetaData); } } @@ -292,24 +347,6 @@ public class LicensesService extends AbstractLifecycleComponent } } - private void performMasterNodeOperations(ClusterChangedEvent event) { - if (event.state().nodes().localNodeMaster()) { - final LicensesMetaData currentLicensesMetaData = event.state().getMetaData().custom(LicensesMetaData.TYPE); - - // register all interested plugins - registerListeners(currentLicensesMetaData); - - // notify all interested plugins - if (currentLicensesMetaData != null && checkIfUpdatedMetaData(event)) { - long nextScheduleFrequency = notifyFeatures(currentLicensesMetaData); - if (notificationScheduler == null) { - notificationScheduler = threadPool.schedule(TimeValue.timeValueMillis(nextScheduleFrequency), executorName(), - new SubmitReschedulingLicensingClientNotificationJob()); - } - } - } - } - private String executorName() { return ThreadPool.Names.GENERIC; } @@ -324,7 +361,7 @@ public class LicensesService extends AbstractLifecycleComponent for (String signature : metaData.getSignatures()) { esLicenses.add(esLicenseManager.fromSignature(signature)); } - return org.elasticsearch.license.manager.Utils.reduceAndMap(esLicenses); + return reduceAndMap(esLicenses); } return ImmutableMap.copyOf(map); @@ -368,11 +405,9 @@ public class LicensesService extends AbstractLifecycleComponent if (clusterService.state().nodes().localNodeMaster()) { LicensesMetaData currentLicensesMetaData = clusterService.state().metaData().custom(LicensesMetaData.TYPE); - long nextScheduleFrequency = TimeValue.timeValueMinutes(5).getMillis(); - if (currentLicensesMetaData != null) { - nextScheduleFrequency = Math.max(nextScheduleFrequency, notifyFeatures(currentLicensesMetaData)); - } - + // Change to debug + logger.info("calling notifyFeatures from LicensingClientNotificationJob"); + long nextScheduleFrequency = Math.max(TimeValue.timeValueMinutes(5).getMillis(), notifyFeatures(currentLicensesMetaData)); TimeValue updateFrequency = TimeValue.timeValueMillis(nextScheduleFrequency); if (this.reschedule) { @@ -406,6 +441,8 @@ public class LicensesService extends AbstractLifecycleComponent } long expiryDuration = expiryDate - System.currentTimeMillis(); if (expiryDuration > 0l) { + // Change to debug + logger.info("calling enabledFeatureIfNeeded on " + listenerHolder.feature + " with trialLicense size=" + licensesWrapper.encodedTrialLicenses.size()); listenerHolder.enableFeatureIfNeeded(); if (nextScheduleFrequency == -1l) { nextScheduleFrequency = expiryDuration + offset; @@ -413,6 +450,8 @@ public class LicensesService extends AbstractLifecycleComponent nextScheduleFrequency = Math.min(expiryDuration + offset, nextScheduleFrequency); } } else { + // Change to debug + logger.info("calling disabledFeatureIfNeeded on " + listenerHolder.feature + " with trialLicense size=" + licensesWrapper.encodedTrialLicenses.size()); listenerHolder.disableFeatureIfNeeded(); } } @@ -444,15 +483,6 @@ public class LicensesService extends AbstractLifecycleComponent } } - private TrialLicense generateTrialLicense(String feature, int durationInDays, int maxNodes) { - return TrialLicensesBuilder.trialLicenseBuilder() - .issuedTo(clusterService.state().getClusterName().value()) - .issueDate(System.currentTimeMillis()) - .durationInDays(durationInDays) - .feature(feature) - .maxNodes(maxNodes) - .build(); - } public static class TrialLicenseOptions { @@ -539,13 +569,13 @@ public class LicensesService extends AbstractLifecycleComponent public void addSignedLicenses(ESLicenseManager licenseManager, Set newLicenses) { Set currentSignedLicenses = signedLicenses(licenseManager); - final ImmutableMap licenseMap = org.elasticsearch.license.manager.Utils.reduceAndMap(Sets.union(currentSignedLicenses, newLicenses)); + final ImmutableMap licenseMap = reduceAndMap(Sets.union(currentSignedLicenses, newLicenses)); this.signatures = licenseManager.toSignatures(licenseMap.values()); } public void removeFeatures(ESLicenseManager licenseManager, Set featuresToDelete) { Set currentSignedLicenses = signedLicenses(licenseManager); - final ImmutableMap licenseMap = org.elasticsearch.license.manager.Utils.reduceAndMap(currentSignedLicenses); + final ImmutableMap licenseMap = reduceAndMap(currentSignedLicenses); Set licensesToDelete = new HashSet<>(); for (Map.Entry entry : licenseMap.entrySet()) { if (featuresToDelete.contains(entry.getKey())) { @@ -561,6 +591,58 @@ public class LicensesService extends AbstractLifecycleComponent } } + + private static class RegisterTrialLicenseRequest extends TransportRequest { + private int maxNodes; + private String feature; + private TimeValue duration; + + private RegisterTrialLicenseRequest() { + } + + private RegisterTrialLicenseRequest(String feature, TimeValue duration, int maxNodes) { + this.maxNodes = maxNodes; + this.feature = feature; + this.duration = duration; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + maxNodes = in.readVInt(); + feature = in.readString(); + duration = new TimeValue(in.readVLong(), TimeUnit.MILLISECONDS); + } + + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(maxNodes); + out.writeString(feature); + out.writeVLong(duration.getMillis()); + } + } + + + private class RegisterTrialLicenseRequestHandler extends BaseTransportRequestHandler { + @Override + public RegisterTrialLicenseRequest newInstance() { + return new RegisterTrialLicenseRequest(); + } + + @Override + public void messageReceived(RegisterTrialLicenseRequest request, TransportChannel channel) throws Exception { + registerTrialLicense(request); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + } + //Should not be exposed; used by testing only public void clear() { if (notificationScheduler != null) { diff --git a/src/main/java/org/elasticsearch/license/plugin/core/trial/TrialLicensesBuilder.java b/src/main/java/org/elasticsearch/license/plugin/core/trial/TrialLicensesBuilder.java index 56039ce9367..863f7dba73e 100644 --- a/src/main/java/org/elasticsearch/license/plugin/core/trial/TrialLicensesBuilder.java +++ b/src/main/java/org/elasticsearch/license/plugin/core/trial/TrialLicensesBuilder.java @@ -6,6 +6,7 @@ package org.elasticsearch.license.plugin.core.trial; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.core.DateUtils; import org.elasticsearch.license.core.ESLicense; @@ -96,7 +97,7 @@ public class TrialLicensesBuilder { private String featureType; private long expiryDate = -1; private long issueDate = -1; - private int durationInDays = -1; + private TimeValue duration; private int maxNodes = -1; private String uid = null; private String issuedTo; @@ -129,8 +130,8 @@ public class TrialLicensesBuilder { return this; } - public TrialLicenseBuilder durationInDays(int days) { - this.durationInDays = days; + public TrialLicenseBuilder duration(TimeValue duration) { + this.duration = duration; return this; } @@ -142,8 +143,7 @@ public class TrialLicensesBuilder { public TrialLicense build() { verify(); if (expiryDate == -1) { - assert durationInDays != -1; - expiryDate = DateUtils.expiryDateAfterDays(issueDate, durationInDays); + expiryDate = issueDate + duration.millis(); } if (uid == null) { uid = UUID.randomUUID().toString(); @@ -190,8 +190,8 @@ public class TrialLicensesBuilder { msg = "feature has to be set"; } else if (issueDate == -1) { msg = "issueDate has to be set"; - } else if (durationInDays == -1 && expiryDate == -1) { - msg = "durationInDays or expiryDate has to be set"; + } else if (duration == null && expiryDate == -1) { + msg = "duration or expiryDate has to be set"; } else if (maxNodes == -1) { msg = "maxNodes has to be set"; } diff --git a/src/test/java/org/elasticsearch/license/plugin/LicensesServiceTests.java b/src/test/java/org/elasticsearch/license/plugin/LicensesServiceTests.java index 66391e16f5c..0a152c3dd70 100644 --- a/src/test/java/org/elasticsearch/license/plugin/LicensesServiceTests.java +++ b/src/test/java/org/elasticsearch/license/plugin/LicensesServiceTests.java @@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE; -@ClusterScope(scope = SUITE, numDataNodes = 10) +@ClusterScope(scope = SUITE, numDataNodes = 3) public class LicensesServiceTests extends ElasticsearchIntegrationTest { @@ -89,9 +89,6 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest { }); latch.await(); clear(); - masterClusterService().remove(licensesService()); - masterClusterService().add(licensesService()); - } @Test @@ -193,6 +190,7 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest { clientService.register("shield", new LicensesService.TrialLicenseOptions(10, 100), new LicensesClientService.Listener() { @Override public void onEnabled() { + logger.info("got onEnabled from LicensesClientService"); latch.countDown(); } @@ -201,6 +199,7 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest { fail(); } }); + logger.info("waiting for onEnabled"); latch.await(); final LicensesMetaData metaData = clusterService().state().metaData().custom(LicensesMetaData.TYPE); assertTrue(metaData.getEncodedTrialLicenses().size() == 1); @@ -330,7 +329,7 @@ public class LicensesServiceTests extends ElasticsearchIntegrationTest { private LicensesClientService licensesClientService() { final InternalTestCluster clients = internalCluster(); - return clients.getInstance(LicensesClientService.class, clients.getMasterName()); + return clients.getInstance(LicensesClientService.class); } private LicensesService licensesService() {