NIFI-8668 ConsumeAzureEventHub NiFi processors need to support storage SAS token authentication

This closes #5136.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Timea Barna 2021-06-08 07:56:38 +02:00 committed by Peter Turcsanyi
parent 0a827d6cd7
commit 462306369f
4 changed files with 243 additions and 19 deletions

View File

@ -645,28 +645,55 @@ public class StandardValidators {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
};
}
public static Validator createRegexMatchingValidator(final Pattern pattern) {
return createRegexMatchingValidator(pattern, false, "Value does not match regular expression: " + pattern.pattern());
}
public static Validator createRegexMatchingValidator(final Pattern pattern, final boolean evaluateExpressions, final String validationMessage) {
return new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
String value = input;
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
if (evaluateExpressions) {
try {
value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
} catch (final Exception e) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.valid(false)
.explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString())
.build();
}
} else {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.explanation("Expression Language Present")
.valid(true)
.build();
}
}
final boolean matches = pattern.matcher(input).matches();
final boolean matches = value != null && pattern.matcher(value).matches();
return new ValidationResult.Builder()
.input(input)
.subject(subject)
.valid(matches)
.explanation(matches ? null : "Value does not match regular expression: " + pattern.pattern())
.explanation(matches ? null : validationMessage)
.build();
}
};
}
public static Validator createRegexMatchingValidator(final Pattern pattern, final boolean evaluateExpressions) {
return createRegexMatchingValidator(pattern, evaluateExpressions, "Value does not match regular expression: " + pattern.pattern());
}
/**
* Creates a @{link Validator} that ensure that a value is a valid Java
* Regular Expression with at least <code>minCapturingGroups</code>

View File

@ -16,13 +16,19 @@
*/
package org.apache.nifi.util.validator;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
@ -523,4 +529,88 @@ public class TestStandardValidators {
vr = val.validate("foo", "http://localhost , https://host2:8080 ", vc);
assertTrue(vr.isValid());
}
@Test
public void testRegexMatchingValidatorWithoutEL() {
Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"));
ValidationContext vc = mock(ValidationContext.class);
when(vc.isExpressionLanguagePresent(any())).thenReturn(false);
when(vc.isExpressionLanguageSupported(any())).thenReturn(false);
validatePropertyIsInvalid(val, null, vc);
validatePropertyIsInvalid(val, "", vc);
validatePropertyIsInvalid(val, "invalid string", vc);
validatePropertyIsValid(val, "?valid string", vc);
}
@Test
public void testRegexMatchingValidatorWithEL() {
Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), true);
ValidationContext vc = mock(ValidationContext.class);
when(vc.isExpressionLanguagePresent(any())).thenReturn(true);
when(vc.isExpressionLanguageSupported(any())).thenReturn(true);
validatePropertyWithELIsInvalid(val, null, vc);
validatePropertyWithELIsInvalid(val, "", vc);
validatePropertyWithELIsInvalid(val, "invalid string", vc);
validatePropertyWithELIsValid(val, "?valid string", vc);
}
@Test
public void testRegexMatchingValidatorWithELError() {
Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), true);
ValidationContext vc = mock(ValidationContext.class);
when(vc.isExpressionLanguagePresent(any())).thenReturn(true);
when(vc.isExpressionLanguageSupported(any())).thenReturn(true);
ValidationResult vr = val.validate("foo", "invalid", vc);
assertFalse(vr.isValid());
assertThat(vr.getExplanation(), containsString("Failed to evaluate the Attribute Expression Language"));
}
@Test
public void testRegexMatchingValidatorWithELWithoutEvaluation() {
Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), false);
ValidationContext vc = mock(ValidationContext.class);
when(vc.isExpressionLanguagePresent(any())).thenReturn(true);
when(vc.isExpressionLanguageSupported(any())).thenReturn(true);
ValidationResult vr = val.validate("foo", "valid", vc);
assertTrue(vr.isValid());
assertEquals("Expression Language Present", vr.getExplanation());
}
private void validatePropertyIsValid(final Validator val, final String input, final ValidationContext vc) {
ValidationResult vr = val.validate("foo", input, vc);
assertTrue(vr.isValid());
}
private void validatePropertyIsInvalid(final Validator val, final String input, final ValidationContext vc) {
ValidationResult vr = val.validate("foo", input, vc);
assertFalse(vr.isValid());
}
private void validatePropertyWithELIsValid(Validator val, String input, ValidationContext vc) {
PropertyValue property = mock(PropertyValue.class);
when(vc.newPropertyValue(input)).thenReturn(property);
when(property.evaluateAttributeExpressions()).thenReturn(property);
when(property.evaluateAttributeExpressions().getValue()).thenReturn(input);
ValidationResult vr = val.validate("foo", input, vc);
assertTrue(vr.isValid());
}
private void validatePropertyWithELIsInvalid(Validator val, String input, ValidationContext vc) {
PropertyValue property = mock(PropertyValue.class);
when(vc.newPropertyValue(input)).thenReturn(property);
when(property.evaluateAttributeExpressions()).thenReturn(property);
when(property.evaluateAttributeExpressions().getValue()).thenReturn(input);
ValidationResult vr = val.validate("foo", input, vc);
assertFalse(vr.isValid());
}
}

View File

@ -74,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static org.apache.nifi.util.StringUtils.isEmpty;
@ -91,7 +92,9 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
})
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
private static final String FORMAT_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
private static final Pattern SAS_TOKEN_PATTERN = Pattern.compile("^\\?.*$");
private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN = "BlobEndpoint=https://%s.blob.core.windows.net/;SharedAccessSignature=%s";
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("event-hub-namespace")
@ -228,7 +231,17 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.required(false)
.build();
static final PropertyDescriptor STORAGE_SAS_TOKEN = new PropertyDescriptor.Builder()
.name("storage-sas-token")
.displayName("Storage SAS Token")
.description("The Azure Storage SAS token to store Event Hub consumer group state. Always starts with a ? character.")
.sensitive(true)
.addValidator(StandardValidators.createRegexMatchingValidator(SAS_TOKEN_PATTERN, true,
"Token must start with a ? character."))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
.build();
static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder()
.name("storage-container-name")
@ -261,7 +274,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
RECORD_READER, RECORD_WRITER,
INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT,
STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME
STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, STORAGE_CONTAINER_NAME
));
Set<Relationship> relationships = new HashSet<>();
@ -324,6 +337,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
final List<ValidationResult> results = new ArrayList<>();
final ControllerService recordReader = validationContext.getProperty(RECORD_READER).asControllerService();
final ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService();
final String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) {
results.add(new ValidationResult.Builder()
.subject("Record Reader and Writer")
@ -332,6 +348,26 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.valid(false)
.build());
}
if (StringUtils.isBlank(storageAccountKey) && StringUtils.isBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(String.format("%s or %s",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.explanation(String.format("either %s or %s should be set.",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.valid(false)
.build());
}
if (StringUtils.isNotBlank(storageAccountKey) && StringUtils.isNotBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(String.format("%s or %s",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.explanation(String.format("%s and %s should not be set at the same time.",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.valid(false)
.build());
}
results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext));
return results;
}
@ -579,12 +615,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(EVENT_HUB_NAME, eventHubName);
final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
validateRequiredProperty(STORAGE_ACCOUNT_KEY, storageAccountKey);
final String consumerHostname = orDefault(context.getProperty(CONSUMER_HOSTNAME).evaluateAttributeExpressions().getValue(),
EventProcessorHost.createHostName("nifi"));
@ -617,7 +647,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis));
final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);
final String storageConnectionString = createStorageConnectionString(context);
final String connectionString;
final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
@ -630,6 +660,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, eventHubName, sasName, sasKey);
}
eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder
.newBuilder(consumerHostname, consumerGroupName)
.useAzureStorageCheckpointLeaseManager(storageConnectionString, containerName, null)
@ -645,6 +676,19 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
eventProcessorHost.registerEventProcessorFactory(new EventProcessorFactory(), options).get();
}
private String createStorageConnectionString(final ProcessContext context) {
final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken = context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
if (storageAccountKey != null) {
return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey);
}
return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, storageSasToken);
}
private String orDefault(String value, String defaultValue) {
return isEmpty(value) ? defaultValue : value;
}

View File

@ -73,8 +73,11 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
public class TestConsumeAzureEventHub {
private static final String namespaceName = "nifi-azure-hub";
private static final String eventHubName = "get-test";
private static final String policyName = "test-pn";
private static final String policyKey = "test-pk";
private static final String storageAccountName = "test-sa";
private static final String storageAccountKey = "test-sa-key";
private static final String storageSasToken = "?test-sa-token";
private ConsumeAzureEventHub.EventProcessor eventProcessor;
private MockProcessSession processSession;
@ -132,12 +135,72 @@ public class TestConsumeAzureEventHub {
testRunner.assertValid();
}
@Test
public void testProcessorConfigValidityWithNeitherStorageKeyNorTokenSet() {
TestRunner testRunner = TestRunners.newTestRunner(processor);
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
testRunner.assertNotValid();
}
@Test
public void testProcessorConfigValidityWithBothStorageKeyAndTokenSet() {
TestRunner testRunner = TestRunners.newTestRunner(processor);
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, storageSasToken);
testRunner.assertNotValid();
}
@Test
public void testProcessorConfigValidityWithTokenSet() {
TestRunner testRunner = TestRunners.newTestRunner(processor);
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, storageSasToken);
testRunner.assertValid();
}
@Test
public void testProcessorConfigValidityWithStorageKeySet() {
TestRunner testRunner = TestRunners.newTestRunner(processor);
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey);
testRunner.assertValid();
}
@Test
public void testReceivedApplicationProperties() throws Exception {
final EventData singleEvent = EventData.create("one".getBytes(StandardCharsets.UTF_8));
singleEvent.getProperties().put("event-sender", "Apache NiFi");
singleEvent.getProperties().put("application", "TestApp");
final Iterable<EventData> eventDataList = Arrays.asList(singleEvent);
final Iterable<EventData> eventDataList = Collections.singletonList(singleEvent);
eventProcessor.onEvents(partitionContext, eventDataList);
processSession.assertCommitted();
@ -150,7 +213,7 @@ public class TestConsumeAzureEventHub {
@Test
public void testReceiveOne() throws Exception {
final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
final Iterable<EventData> eventDataList = Collections.singletonList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
eventProcessor.onEvents(partitionContext, eventDataList);
processSession.assertCommitted();
@ -262,7 +325,7 @@ public class TestConsumeAzureEventHub {
.collect(Collectors.toList());
final List<Record> recordSetList = addEndRecord.apply(recordList);
final Record[] records = recordSetList.toArray(new Record[recordSetList.size()]);
final Record[] records = recordSetList.toArray(new Record[0]);
switch (throwExceptionAt) {
case -1:
@ -277,8 +340,8 @@ public class TestConsumeAzureEventHub {
default:
final List<Record> recordList1 = addEndRecord.apply(recordList.subList(0, throwExceptionAt));
final List<Record> recordList2 = addEndRecord.apply(recordList.subList(throwExceptionAt + 1, recordList.size()));
final Record[] records1 = recordList1.toArray(new Record[recordList1.size()]);
final Record[] records2 = recordList2.toArray(new Record[recordList2.size()]);
final Record[] records1 = recordList1.toArray(new Record[0]);
final Record[] records2 = recordList2.toArray(new Record[0]);
when(reader.nextRecord())
.thenReturn(records1[0], Arrays.copyOfRange(records1, 1, records1.length))
.thenThrow(new MalformedRecordException("Simulating Record parse failure."))