diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index fd53401b0b..1dad5d47fc 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -645,28 +645,55 @@ public class StandardValidators { return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); } }; - } + public static Validator createRegexMatchingValidator(final Pattern pattern) { + return createRegexMatchingValidator(pattern, false, "Value does not match regular expression: " + pattern.pattern()); + } + + public static Validator createRegexMatchingValidator(final Pattern pattern, final boolean evaluateExpressions, final String validationMessage) { return new Validator() { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + String value = input; if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { - return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + if (evaluateExpressions) { + try { + value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); + } catch (final Exception e) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString()) + .build(); + } + } else { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .explanation("Expression Language Present") + .valid(true) + .build(); + } } - final boolean matches = pattern.matcher(input).matches(); + final boolean matches = value != null && pattern.matcher(value).matches(); return new ValidationResult.Builder() .input(input) .subject(subject) .valid(matches) - .explanation(matches ? null : "Value does not match regular expression: " + pattern.pattern()) + .explanation(matches ? null : validationMessage) .build(); } }; } + public static Validator createRegexMatchingValidator(final Pattern pattern, final boolean evaluateExpressions) { + return createRegexMatchingValidator(pattern, evaluateExpressions, "Value does not match regular expression: " + pattern.pattern()); + } + /** * Creates a @{link Validator} that ensure that a value is a valid Java * Regular Expression with at least minCapturingGroups diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java index c82c64e341..69bbe9b61e 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java @@ -16,13 +16,19 @@ */ package org.apache.nifi.util.validator; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; @@ -523,4 +529,88 @@ public class TestStandardValidators { vr = val.validate("foo", "http://localhost , https://host2:8080 ", vc); assertTrue(vr.isValid()); } + + @Test + public void testRegexMatchingValidatorWithoutEL() { + Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$")); + ValidationContext vc = mock(ValidationContext.class); + when(vc.isExpressionLanguagePresent(any())).thenReturn(false); + when(vc.isExpressionLanguageSupported(any())).thenReturn(false); + + validatePropertyIsInvalid(val, null, vc); + + validatePropertyIsInvalid(val, "", vc); + + validatePropertyIsInvalid(val, "invalid string", vc); + + validatePropertyIsValid(val, "?valid string", vc); + } + + @Test + public void testRegexMatchingValidatorWithEL() { + Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), true); + ValidationContext vc = mock(ValidationContext.class); + when(vc.isExpressionLanguagePresent(any())).thenReturn(true); + when(vc.isExpressionLanguageSupported(any())).thenReturn(true); + + validatePropertyWithELIsInvalid(val, null, vc); + + validatePropertyWithELIsInvalid(val, "", vc); + + validatePropertyWithELIsInvalid(val, "invalid string", vc); + + validatePropertyWithELIsValid(val, "?valid string", vc); + } + + @Test + public void testRegexMatchingValidatorWithELError() { + Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), true); + ValidationContext vc = mock(ValidationContext.class); + when(vc.isExpressionLanguagePresent(any())).thenReturn(true); + when(vc.isExpressionLanguageSupported(any())).thenReturn(true); + + ValidationResult vr = val.validate("foo", "invalid", vc); + assertFalse(vr.isValid()); + assertThat(vr.getExplanation(), containsString("Failed to evaluate the Attribute Expression Language")); + } + + @Test + public void testRegexMatchingValidatorWithELWithoutEvaluation() { + Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), false); + ValidationContext vc = mock(ValidationContext.class); + when(vc.isExpressionLanguagePresent(any())).thenReturn(true); + when(vc.isExpressionLanguageSupported(any())).thenReturn(true); + + ValidationResult vr = val.validate("foo", "valid", vc); + assertTrue(vr.isValid()); + assertEquals("Expression Language Present", vr.getExplanation()); + } + + private void validatePropertyIsValid(final Validator val, final String input, final ValidationContext vc) { + ValidationResult vr = val.validate("foo", input, vc); + assertTrue(vr.isValid()); + } + + private void validatePropertyIsInvalid(final Validator val, final String input, final ValidationContext vc) { + ValidationResult vr = val.validate("foo", input, vc); + assertFalse(vr.isValid()); + } + + private void validatePropertyWithELIsValid(Validator val, String input, ValidationContext vc) { + PropertyValue property = mock(PropertyValue.class); + when(vc.newPropertyValue(input)).thenReturn(property); + when(property.evaluateAttributeExpressions()).thenReturn(property); + when(property.evaluateAttributeExpressions().getValue()).thenReturn(input); + ValidationResult vr = val.validate("foo", input, vc); + assertTrue(vr.isValid()); + } + + private void validatePropertyWithELIsInvalid(Validator val, String input, ValidationContext vc) { + PropertyValue property = mock(PropertyValue.class); + when(vc.newPropertyValue(input)).thenReturn(property); + when(property.evaluateAttributeExpressions()).thenReturn(property); + when(property.evaluateAttributeExpressions().getValue()).thenReturn(input); + ValidationResult vr = val.validate("foo", input, vc); + assertFalse(vr.isValid()); + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index da3f3cdc54..2b0f2e3f43 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -74,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import static org.apache.nifi.util.StringUtils.isEmpty; @@ -91,7 +92,9 @@ import static org.apache.nifi.util.StringUtils.isEmpty; }) public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { - private static final String FORMAT_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; + private static final Pattern SAS_TOKEN_PATTERN = Pattern.compile("^\\?.*$"); + private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; + private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN = "BlobEndpoint=https://%s.blob.core.windows.net/;SharedAccessSignature=%s"; static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder() .name("event-hub-namespace") @@ -228,7 +231,17 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { .sensitive(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .required(true) + .required(false) + .build(); + static final PropertyDescriptor STORAGE_SAS_TOKEN = new PropertyDescriptor.Builder() + .name("storage-sas-token") + .displayName("Storage SAS Token") + .description("The Azure Storage SAS token to store Event Hub consumer group state. Always starts with a ? character.") + .sensitive(true) + .addValidator(StandardValidators.createRegexMatchingValidator(SAS_TOKEN_PATTERN, true, + "Token must start with a ? character.")) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(false) .build(); static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder() .name("storage-container-name") @@ -261,7 +274,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME, RECORD_READER, RECORD_WRITER, INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, - STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME + STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, STORAGE_CONTAINER_NAME )); Set relationships = new HashSet<>(); @@ -324,6 +337,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { final List results = new ArrayList<>(); final ControllerService recordReader = validationContext.getProperty(RECORD_READER).asControllerService(); final ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService(); + final String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); + final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue(); + if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) { results.add(new ValidationResult.Builder() .subject("Record Reader and Writer") @@ -332,6 +348,26 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { .valid(false) .build()); } + + if (StringUtils.isBlank(storageAccountKey) && StringUtils.isBlank(storageSasToken)) { + results.add(new ValidationResult.Builder() + .subject(String.format("%s or %s", + STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) + .explanation(String.format("either %s or %s should be set.", + STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) + .valid(false) + .build()); + } + + if (StringUtils.isNotBlank(storageAccountKey) && StringUtils.isNotBlank(storageSasToken)) { + results.add(new ValidationResult.Builder() + .subject(String.format("%s or %s", + STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) + .explanation(String.format("%s and %s should not be set at the same time.", + STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) + .valid(false) + .build()); + } results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext)); return results; } @@ -579,12 +615,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue(); validateRequiredProperty(EVENT_HUB_NAME, eventHubName); - final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); - validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName); - - final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); - validateRequiredProperty(STORAGE_ACCOUNT_KEY, storageAccountKey); - final String consumerHostname = orDefault(context.getProperty(CONSUMER_HOSTNAME).evaluateAttributeExpressions().getValue(), EventProcessorHost.createHostName("nifi")); @@ -617,7 +647,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { .evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis)); - final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey); + final String storageConnectionString = createStorageConnectionString(context); final String connectionString; final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); @@ -630,6 +660,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey); connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, eventHubName, sasName, sasKey); } + eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder .newBuilder(consumerHostname, consumerGroupName) .useAzureStorageCheckpointLeaseManager(storageConnectionString, containerName, null) @@ -645,6 +676,19 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { eventProcessorHost.registerEventProcessorFactory(new EventProcessorFactory(), options).get(); } + private String createStorageConnectionString(final ProcessContext context) { + final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); + validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName); + + final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); + final String storageSasToken = context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue(); + + if (storageAccountKey != null) { + return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey); + } + return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, storageSasToken); + } + private String orDefault(String value, String defaultValue) { return isEmpty(value) ? defaultValue : value; } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index 17f844289e..aef8b4a091 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -73,8 +73,11 @@ import org.apache.nifi.serialization.record.MockRecordWriter; public class TestConsumeAzureEventHub { private static final String namespaceName = "nifi-azure-hub"; private static final String eventHubName = "get-test"; + private static final String policyName = "test-pn"; + private static final String policyKey = "test-pk"; private static final String storageAccountName = "test-sa"; private static final String storageAccountKey = "test-sa-key"; + private static final String storageSasToken = "?test-sa-token"; private ConsumeAzureEventHub.EventProcessor eventProcessor; private MockProcessSession processSession; @@ -132,12 +135,72 @@ public class TestConsumeAzureEventHub { testRunner.assertValid(); } + @Test + public void testProcessorConfigValidityWithNeitherStorageKeyNorTokenSet() { + TestRunner testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName); + testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName); + testRunner.assertNotValid(); + } + + @Test + public void testProcessorConfigValidityWithBothStorageKeyAndTokenSet() { + TestRunner testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName); + testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, storageSasToken); + testRunner.assertNotValid(); + } + + @Test + public void testProcessorConfigValidityWithTokenSet() { + TestRunner testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName); + testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, storageSasToken); + testRunner.assertValid(); + } + + @Test + public void testProcessorConfigValidityWithStorageKeySet() { + TestRunner testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName); + testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey); + testRunner.assertValid(); + } + @Test public void testReceivedApplicationProperties() throws Exception { final EventData singleEvent = EventData.create("one".getBytes(StandardCharsets.UTF_8)); singleEvent.getProperties().put("event-sender", "Apache NiFi"); singleEvent.getProperties().put("application", "TestApp"); - final Iterable eventDataList = Arrays.asList(singleEvent); + final Iterable eventDataList = Collections.singletonList(singleEvent); eventProcessor.onEvents(partitionContext, eventDataList); processSession.assertCommitted(); @@ -150,7 +213,7 @@ public class TestConsumeAzureEventHub { @Test public void testReceiveOne() throws Exception { - final Iterable eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8))); + final Iterable eventDataList = Collections.singletonList(EventData.create("one".getBytes(StandardCharsets.UTF_8))); eventProcessor.onEvents(partitionContext, eventDataList); processSession.assertCommitted(); @@ -262,7 +325,7 @@ public class TestConsumeAzureEventHub { .collect(Collectors.toList()); final List recordSetList = addEndRecord.apply(recordList); - final Record[] records = recordSetList.toArray(new Record[recordSetList.size()]); + final Record[] records = recordSetList.toArray(new Record[0]); switch (throwExceptionAt) { case -1: @@ -277,8 +340,8 @@ public class TestConsumeAzureEventHub { default: final List recordList1 = addEndRecord.apply(recordList.subList(0, throwExceptionAt)); final List recordList2 = addEndRecord.apply(recordList.subList(throwExceptionAt + 1, recordList.size())); - final Record[] records1 = recordList1.toArray(new Record[recordList1.size()]); - final Record[] records2 = recordList2.toArray(new Record[recordList2.size()]); + final Record[] records1 = recordList1.toArray(new Record[0]); + final Record[] records2 = recordList2.toArray(new Record[0]); when(reader.nextRecord()) .thenReturn(records1[0], Arrays.copyOfRange(records1, 1, records1.length)) .thenThrow(new MalformedRecordException("Simulating Record parse failure."))