From 2a3a7d93795d0c7a7efbf91e195fc2a9a7c4633a Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Sun, 12 Nov 2023 21:55:25 +0100 Subject: [PATCH] NIFI-11294 Support Component State Checkpoints in ConsumeAzureEventHub This closes #8013 Signed-off-by: David Handermann Co-authored-by: Peter Turcsanyi Co-authored-by: Malthe Borch --- .../apache/nifi/state/MockStateManager.java | 28 +- .../nifi-azure-processors/pom.xml | 6 + .../azure/eventhub/ConsumeAzureEventHub.java | 155 +++++-- .../checkpoint/CheckpointStoreKey.java | 33 ++ .../checkpoint/CheckpointStoreKeyPrefix.java | 33 ++ .../checkpoint/CheckpointStrategy.java | 48 +++ .../ComponentStateCheckpointStore.java | 333 +++++++++++++++ .../ComponentStateCheckpointStoreUtils.java | 163 +++++++ .../ClusterNodeDisconnectedException.java | 24 ++ ...omponentStateCheckpointStoreException.java | 28 ++ .../ConcurrentStateModificationException.java | 24 ++ .../exception/StateNotAvailableException.java | 24 ++ .../eventhub/TestConsumeAzureEventHub.java | 14 + .../AbstractCheckpointStoreTest.java | 220 ++++++++++ ...ractComponentStateCheckpointStoreTest.java | 72 ++++ ...ntStateCheckpointStoreConcurrencyTest.java | 182 ++++++++ ...ponentStateCheckpointStoreFailureTest.java | 255 +++++++++++ .../ComponentStateCheckpointStoreTest.java | 403 ++++++++++++++++++ ...omponentStateCheckpointStoreUtilsTest.java | 129 ++++++ 19 files changed, 2129 insertions(+), 45 deletions(-) create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStoreKey.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStoreKeyPrefix.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStrategy.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStore.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtils.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ClusterNodeDisconnectedException.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ComponentStateCheckpointStoreException.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ConcurrentStateModificationException.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/StateNotAvailableException.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractCheckpointStoreTest.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractComponentStateCheckpointStoreTest.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreConcurrencyTest.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreFailureTest.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreTest.java create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtilsTest.java diff --git a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java index 6086191d41..5ef3116223 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java +++ b/nifi-mock/src/main/java/org/apache/nifi/state/MockStateManager.java @@ -29,10 +29,20 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import static org.apache.nifi.state.MockStateManager.ExecutionMode.CLUSTERED; +import static org.apache.nifi.state.MockStateManager.ExecutionMode.STANDALONE; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class MockStateManager implements StateManager { + + public enum ExecutionMode { + // in CLUSTERED mode separate state maps are used for the CLUSTER and the LOCAL scopes + CLUSTERED, + // in STANDALONE mode the same state map (the local one) is used for both the CLUSTER and the LOCAL scopes + STANDALONE + } + private final AtomicInteger versionIndex = new AtomicInteger(0); private StateMap localStateMap = new MockStateMap(null, -1L); @@ -50,6 +60,8 @@ public class MockStateManager implements StateManager { private final boolean usesLocalState; private final boolean usesClusterState; + private ExecutionMode executionMode = CLUSTERED; + public MockStateManager(final Object component) { final Stateful stateful = component.getClass().getAnnotation(Stateful.class); if (stateful == null) { @@ -78,13 +90,17 @@ public class MockStateManager implements StateManager { localStateMap = new MockStateMap(null, -1L); } + public void setExecutionMode(ExecutionMode executionMode) { + this.executionMode = executionMode; + } + @Override public synchronized void setState(final Map state, final Scope scope) throws IOException { verifyAnnotation(scope); verifyCanSet(scope); final StateMap stateMap = new MockStateMap(state, versionIndex.incrementAndGet()); - if (scope == Scope.CLUSTER) { + if (scope == Scope.CLUSTER && executionMode == CLUSTERED) { clusterStateMap = stateMap; } else { localStateMap = stateMap; @@ -102,7 +118,7 @@ public class MockStateManager implements StateManager { verifyAnnotation(scope); if (scope == Scope.CLUSTER) { clusterRetrievedCount.incrementAndGet(); - return clusterStateMap; + return executionMode == CLUSTERED ? clusterStateMap : localStateMap; } else { localRetrievedCount.incrementAndGet(); return localStateMap; @@ -120,10 +136,14 @@ public class MockStateManager implements StateManager { public synchronized boolean replace(final StateMap oldValue, final Map newValue, final Scope scope) throws IOException { verifyAnnotation(scope); if (scope == Scope.CLUSTER) { - if (oldValue == clusterStateMap) { + if (executionMode == CLUSTERED && oldValue == clusterStateMap) { verifyCanSet(scope); clusterStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); return true; + } else if (executionMode == STANDALONE && oldValue == localStateMap) { + verifyCanSet(scope); + localStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet()); + return true; } return false; @@ -176,7 +196,7 @@ public class MockStateManager implements StateManager { private String getValue(final String key, final Scope scope) { final StateMap stateMap; - if (scope == Scope.CLUSTER) { + if (scope == Scope.CLUSTER && executionMode == CLUSTERED) { stateMap = clusterStateMap; } else { stateMap = localStateMap; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 06b03f9965..202e427b55 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -199,6 +199,12 @@ 2.0.0-SNAPSHOT test + + io.projectreactor + reactor-test + 3.5.11 + test + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index 132705883a..de333ae6d9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.eventhub; +import com.azure.core.amqp.AmqpClientOptions; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpException; @@ -24,6 +25,7 @@ import com.azure.core.http.ProxyOptions; import com.azure.core.util.HttpClientOptions; import com.azure.identity.ManagedIdentityCredential; import com.azure.identity.ManagedIdentityCredentialBuilder; +import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventProcessorClient; import com.azure.messaging.eventhubs.EventProcessorClientBuilder; @@ -36,16 +38,20 @@ import com.azure.messaging.eventhubs.models.PartitionContext; import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClientBuilder; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.Stateful; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -58,6 +64,9 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStrategy; +import org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStore; +import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ComponentStateCheckpointStoreException; import org.apache.nifi.processors.azure.eventhub.position.EarliestEventPositionProvider; import org.apache.nifi.processors.azure.eventhub.position.LegacyBlobStorageEventPositionProvider; import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; @@ -81,15 +90,19 @@ import java.io.OutputStream; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Pattern; import static org.apache.commons.lang3.StringUtils.defaultIfBlank; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLIENT_ID; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLUSTERED; @Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"}) @CapabilityDescription("Receives messages from Microsoft Azure Event Hubs with checkpointing to ensure consistent event processing. " @@ -98,6 +111,8 @@ import static org.apache.commons.lang3.StringUtils.defaultIfBlank; + "In clustered environment, ConsumeAzureEventHub processor instances form a consumer group and the messages are distributed among the cluster nodes " + "(each message is processed on one cluster node only).") @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Local state is used to store the client id. " + + "Cluster state is used to store partition ownership and checkpoint information when component state is configured as the checkpointing strategy.") @TriggerSerially @WritesAttributes({ @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"), @@ -181,7 +196,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem static final PropertyDescriptor INITIAL_OFFSET = new PropertyDescriptor.Builder() .name("event-hub-initial-offset") .displayName("Initial Offset") - .description("Specify where to start receiving messages if offset is not stored in Azure Storage.") + .description("Specify where to start receiving messages if offset is not yet stored in the checkpoint store.") .required(true) .allowableValues(INITIAL_OFFSET_START_OF_STREAM, INITIAL_OFFSET_END_OF_STREAM) .defaultValue(INITIAL_OFFSET_END_OF_STREAM) @@ -218,12 +233,23 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder() .name("event-hub-message-receive-timeout") .displayName("Message Receive Timeout") - .description("The amount of time this consumer should wait to receive the Prefetch Count before returning.") + .description("The amount of time this consumer should wait to receive the Batch Size before returning.") .defaultValue("1 min") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .required(true) .build(); + + static final PropertyDescriptor CHECKPOINT_STRATEGY = new PropertyDescriptor.Builder() + .name("checkpoint-strategy") + .displayName("Checkpoint Strategy") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .allowableValues(CheckpointStrategy.class) + .defaultValue(CheckpointStrategy.AZURE_BLOB_STORAGE.getValue()) + .description("Specifies which strategy to use for storing and retrieving partition ownership and checkpoint information for each partition.") + .build(); + static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder() .name("storage-account-name") .displayName("Storage Account Name") @@ -231,6 +257,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .required(true) + .dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE) .build(); static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder() .name("storage-account-key") @@ -240,6 +267,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .required(false) + .dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE) .build(); static final PropertyDescriptor STORAGE_SAS_TOKEN = new PropertyDescriptor.Builder() .name("storage-sas-token") @@ -250,6 +278,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem "Token must start with a ? character.")) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .required(false) + .dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE) .build(); static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder() .name("storage-container-name") @@ -259,6 +288,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) .required(false) + .dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE) .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -289,6 +319,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT, + CHECKPOINT_STRATEGY, STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, @@ -301,10 +332,10 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem private volatile RecordReaderFactory readerFactory; private volatile RecordSetWriterFactory writerFactory; - private volatile String namespaceName; private volatile boolean isRecordReaderSet = false; private volatile boolean isRecordWriterSet = false; - private volatile String serviceBusEndpoint; + + private volatile String clientId; @Override protected List getSupportedPropertyDescriptors() { @@ -328,6 +359,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem final ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService(); final String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue(); + final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue()); if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) { results.add(new ValidationResult.Builder() @@ -338,24 +370,26 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem .build()); } - if (StringUtils.isBlank(storageAccountKey) && StringUtils.isBlank(storageSasToken)) { - results.add(new ValidationResult.Builder() - .subject(String.format("%s or %s", - STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) - .explanation(String.format("either %s or %s should be set.", - STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) - .valid(false) - .build()); - } + if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) { + if (StringUtils.isBlank(storageAccountKey) && StringUtils.isBlank(storageSasToken)) { + results.add(new ValidationResult.Builder() + .subject(String.format("%s or %s", + STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) + .explanation(String.format("either %s or %s should be set.", + STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) + .valid(false) + .build()); + } - if (StringUtils.isNotBlank(storageAccountKey) && StringUtils.isNotBlank(storageSasToken)) { - results.add(new ValidationResult.Builder() - .subject(String.format("%s or %s", - STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) - .explanation(String.format("%s and %s should not be set at the same time.", - STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) - .valid(false) - .build()); + if (StringUtils.isNotBlank(storageAccountKey) && StringUtils.isNotBlank(storageSasToken)) { + results.add(new ValidationResult.Builder() + .subject(String.format("%s or %s", + STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) + .explanation(String.format("%s and %s should not be set at the same time.", + STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName())) + .valid(false) + .build()); + } } results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext)); return results; @@ -370,6 +404,24 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem } } + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException { + StateManager stateManager = context.getStateManager(); + + String clientId = stateManager.getState(Scope.LOCAL).get(CLIENT_ID.key()); + if (clientId == null) { + clientId = UUID.randomUUID().toString(); + + final Map clientState = new HashMap<>(); + clientState.put(CLIENT_ID.key(), clientId); + clientState.put(CLUSTERED.key(), Boolean.toString(getNodeTypeProvider().isClustered())); + + stateManager.setState(clientState, Scope.LOCAL); + } + + this.clientId = clientId; + } + @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) { if (eventProcessorClient == null) { @@ -398,25 +450,42 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem processSessionFactory = null; readerFactory = null; writerFactory = null; + clientId = null; } } protected EventProcessorClient createClient(final ProcessContext context) { - namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue(); + final String eventHubNamespace = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue(); final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue(); + final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); final String consumerGroup = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue(); - final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName); - final String storageConnectionString = createStorageConnectionString(context); - final BlobContainerClientBuilder blobContainerClientBuilder = new BlobContainerClientBuilder() - .connectionString(storageConnectionString) - .containerName(containerName); - final ProxyOptions storageProxyOptions = AzureStorageUtils.getProxyOptions(context); - if (storageProxyOptions != null) { - blobContainerClientBuilder.clientOptions(new HttpClientOptions().setProxyOptions(storageProxyOptions)); + final String fullyQualifiedNamespace = String.format("%s%s", eventHubNamespace, serviceBusEndpoint); + + final CheckpointStore checkpointStore; + final Map legacyPartitionEventPosition; + + final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(context.getProperty(CHECKPOINT_STRATEGY).getValue()); + + if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) { + final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName); + final String storageConnectionString = createStorageConnectionString(context); + final BlobContainerClientBuilder blobContainerClientBuilder = new BlobContainerClientBuilder() + .connectionString(storageConnectionString) + .containerName(containerName); + final ProxyOptions storageProxyOptions = AzureStorageUtils.getProxyOptions(context); + if (storageProxyOptions != null) { + blobContainerClientBuilder.clientOptions(new HttpClientOptions().setProxyOptions(storageProxyOptions)); + } + final BlobContainerAsyncClient blobContainerAsyncClient = blobContainerClientBuilder.buildAsyncClient(); + checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient); + legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup); + } else { + ComponentStateCheckpointStore componentStateCheckpointStore = new ComponentStateCheckpointStore(clientId, context.getStateManager()); + componentStateCheckpointStore.cleanUp(fullyQualifiedNamespace, eventHubName, consumerGroup); + checkpointStore = componentStateCheckpointStore; + legacyPartitionEventPosition = Collections.emptyMap(); } - final BlobContainerAsyncClient blobContainerAsyncClient = blobContainerClientBuilder.buildAsyncClient(); - final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient); final Long receiveTimeout = context.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS); final Duration maxWaitTime = Duration.ofMillis(receiveTimeout); @@ -426,12 +495,12 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem final EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() .transportType(transportType) .consumerGroup(consumerGroup) + .clientOptions(new AmqpClientOptions().setIdentifier(clientId)) .trackLastEnqueuedEventProperties(true) .checkpointStore(checkpointStore) .processError(errorProcessor) .processEventBatch(eventBatchProcessor, maxBatchSize, maxWaitTime); - final String fullyQualifiedNamespace = String.format("%s%s", namespaceName, serviceBusEndpoint); final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); if (useManagedIdentity) { final ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder(); @@ -439,7 +508,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, managedIdentityCredential); } else { final String policyName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue(); - final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue(); + final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue(); final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey); eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential); } @@ -449,7 +518,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem eventProcessorClientBuilder.prefetchCount(prefetchCount); } - final Map legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup); if (legacyPartitionEventPosition.isEmpty()) { final String initialOffset = context.getProperty(INITIAL_OFFSET).getValue(); // EventPosition.latest() is the default behavior is absence of existing checkpoints @@ -468,9 +536,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem } protected String getTransitUri(final PartitionContext partitionContext) { - return String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s", - namespaceName, - serviceBusEndpoint, + return String.format("amqps://%s/%s/ConsumerGroups/%s/Partitions/%s", + partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), partitionContext.getPartitionId() @@ -520,7 +587,14 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem } } - getLogger().error("Receive Events failed Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]", + final String errorMessage; + if (throwable instanceof ComponentStateCheckpointStoreException) { + errorMessage = "Failed to access Component State Checkpoint Store"; + } else { + errorMessage = "Receive Events failed"; + } + + getLogger().error(errorMessage + ". Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]", partitionContext.getFullyQualifiedNamespace(), partitionContext.getEventHubName(), partitionContext.getConsumerGroup(), @@ -666,8 +740,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem private String createStorageConnectionString(final ProcessContext context) { final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); - - serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); + final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue(); final String domainName = serviceBusEndpoint.replace(".servicebus.", ""); final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); final String storageSasToken = context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue(); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStoreKey.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStoreKey.java new file mode 100644 index 0000000000..ccca12ebe6 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStoreKey.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +public enum CheckpointStoreKey { + + CLIENT_ID("_clientId"), + CLUSTERED("_clustered"); + + private final String key; + + CheckpointStoreKey(String key) { + this.key = key; + } + + public String key() { + return key; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStoreKeyPrefix.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStoreKeyPrefix.java new file mode 100644 index 0000000000..8f92ed69da --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStoreKeyPrefix.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +public enum CheckpointStoreKeyPrefix { + + OWNERSHIP("ownership"), + CHECKPOINT("checkpoint"); + + private final String keyPrefix; + + CheckpointStoreKeyPrefix(String keyPrefix) { + this.keyPrefix = keyPrefix; + } + + public String keyPrefix() { + return keyPrefix; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStrategy.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStrategy.java new file mode 100644 index 0000000000..8fe28d16dd --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointStrategy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import org.apache.nifi.components.DescribedValue; + +public enum CheckpointStrategy implements DescribedValue { + AZURE_BLOB_STORAGE("Azure Blob Storage", "Use Azure Blob Storage to store partition ownership and checkpoint information"), + COMPONENT_STATE("Component State", "Use component state to store partition ownership and checkpoint information"); + + private final String label; + private final String description; + + CheckpointStrategy(String label, String description) { + this.label = label; + this.description = description; + } + + @Override + public String getValue() { + return this.name(); + } + + @Override + public String getDisplayName() { + return label; + } + + @Override + public String getDescription() { + return description; + } +} + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStore.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStore.java new file mode 100644 index 0000000000..26a1eb7045 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStore.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import com.azure.core.util.CoreUtils; +import com.azure.messaging.eventhubs.CheckpointStore; +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionContext; +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ClusterNodeDisconnectedException; +import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ConcurrentStateModificationException; +import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.StateNotAvailableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLUSTERED; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix.CHECKPOINT; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix.OWNERSHIP; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.checkpointToString; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertOwnership; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertPartitionContext; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointKey; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointValue; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipKey; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipValue; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.ownershipListToString; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.ownershipToString; + +/** + * The {@link com.azure.messaging.eventhubs.CheckpointStore} is responsible for managing the storage of partition ownership and checkpoint information for Azure Event Hubs consumers. + * The underlying storage has to be persistent and centralized (shared across the consumer clients in the consumer group). + *

+ * There exist one ownership entry and one checkpoint entry for each partition in the store. They represent {@link com.azure.messaging.eventhubs.models.PartitionOwnership} + * and {@link com.azure.messaging.eventhubs.models.Checkpoint} entities in a storage-specific serialized form. + *

+ * The checkpoint store is plugged into {@link com.azure.messaging.eventhubs.EventProcessorClient} and directly used by the load balancer algorithm running in each consumer client instance. + *

+ * {@code ComponentStateCheckpointStore} stores the partition ownership and checkpoint information in the component's (that is {@link org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub} + * processor's) state using NiFi's {@link org.apache.nifi.components.state.StateManager} in the background. + *

+ * The format of the ownership entry in the state map: + *

    ownership/event-hub-namespace/event-hub-name/consumer-group/partition-id -> client-id/last-modified-time/etag
+ *

+ * The format of the checkpoint entry in the state map: + *

    checkpoint/event-hub-namespace/event-hub-name/consumer-group/partition-id -> offset/sequence-number
+ *

+ * The checkpoint store is required to provide optimistic locking mechanism in order to avoid concurrent updating of the same ownership entry and therefore owning the same partition + * by multiple client instances at the same time. The optimistic locking is supposed to be based on the eTag field of {@link com.azure.messaging.eventhubs.models.PartitionOwnership} + * and should be supported at entry level (only updating the same partition ownership is conflicting, claiming ownership of 2 different partitions or updating 2 checkpoints in parallel are + * valid operations as they are independent changes). + *

+ * {@link org.apache.nifi.components.state.StateManager#replace(StateMap, Map, Scope)} method supports optimistic locking but only globally, in the scope of the whole state map (which may or may not + * contain conflicting changes after update). For this reason, the state update had to be implemented in 2 phases in {@link ComponentStateCheckpointStore#claimOwnership(List)}: + *

    + *
  • in the 1st phase the algorithm gets the current state and tries to set the ownership in memory based on eTag, the claim request is skipped if eTag + * does not match (the original eTag was retrieved in {@link ComponentStateCheckpointStore#listOwnership(String, String, String)})
  • + *
  • in the 2nd phase {@link org.apache.nifi.components.state.StateManager#replace(StateMap, Map, Scope)} is called to persist the new state and if it is not successful - meaning + * that another client instance changed the state in the meantime which may or may not be conflicting -, then the whole process needs to be started over with the 1st phase
  • + *
+ */ +public class ComponentStateCheckpointStore implements CheckpointStore { + + private static final Logger LOGGER = LoggerFactory.getLogger(ComponentStateCheckpointStore.class); + + private final String clientId; + + private final StateManager stateManager; + + public ComponentStateCheckpointStore(String clientId, StateManager stateManager) { + this.clientId = clientId; + this.stateManager = stateManager; + } + + /** + * Cleans up the underlying state map and retains only items matching the "EventHub coordinates" passed in ({@code fullyQualifiedNamespace}, {@code eventHubName} and {@code consumerGroup}). + * The method should be called once in the initialization phase in order to remove the obsolete items but the checkpoint store can operate properly without doing that too. + * + * @param fullyQualifiedNamespace the fullyQualifiedNamespace of the items to be retained + * @param eventHubName the eventHubName of the items to be retained + * @param consumerGroup the consumerGroup of the items to be retained + */ + public void cleanUp(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) { + cleanUpMono(fullyQualifiedNamespace, eventHubName, consumerGroup) + .subscribe(); + } + + Mono cleanUpMono(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) { + return getState() + .doFirst(() -> debug("cleanUp() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup)) + .flatMap(oldState -> { + Map newMap = oldState.toMap().entrySet().stream() + .filter(e -> { + String key = e.getKey(); + if (!key.startsWith(OWNERSHIP.keyPrefix()) && !key.startsWith(CHECKPOINT.keyPrefix())) { + return true; + } + PartitionContext context = convertPartitionContext(key); + return context.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace) + && context.getEventHubName().equalsIgnoreCase(eventHubName) + && context.getConsumerGroup().equalsIgnoreCase(consumerGroup); + }) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + + int removed = oldState.toMap().size() - newMap.size(); + if (removed > 0) { + debug("cleanUp() -> Removed {} item(s)", removed); + return updateState(oldState, newMap); + } else { + debug("cleanUp() -> Nothing to clean up"); + return Mono.empty(); + } + }) + .doOnSuccess(__ -> debug("cleanUp() -> Succeeded")) + .retryWhen(createRetrySpec("cleanUp")) + .doOnError(throwable -> debug("cleanUp() -> Failed: {}", throwable.getMessage())); + } + + @Override + public Flux listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) { + return getState() + .doFirst(() -> debug("listOwnership() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup)) + .flatMapMany(state -> { + checkDisconnectedNode(state); + + return getOwnerships(state); + }) + .filter(ownership -> + ownership.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace) + && ownership.getEventHubName().equalsIgnoreCase(eventHubName) + && ownership.getConsumerGroup().equalsIgnoreCase(consumerGroup) + ) + .doOnNext(partitionOwnership -> debug("listOwnership() -> Returning {}", ownershipToString(partitionOwnership))) + .doOnComplete(() -> debug("listOwnership() -> Succeeded")) + .doOnError(throwable -> debug("listOwnership() -> Failed: {}", throwable.getMessage())); + } + + @Override + public Flux claimOwnership(List requestedPartitionOwnerships) { + return getState() + .doFirst(() -> debug("claimOwnership() -> Entering [{}]", ownershipListToString(requestedPartitionOwnerships))) + .flatMapMany(oldState -> { + checkDisconnectedNode(oldState); + + Map newMap = new HashMap<>(oldState.toMap()); + + List claimedOwnerships = new ArrayList<>(); + + long timestamp = System.currentTimeMillis(); + + for (PartitionOwnership requestedPartitionOwnership : requestedPartitionOwnerships) { + String key = createOwnershipKey(requestedPartitionOwnership); + + if (oldState.get(key) != null) { + PartitionOwnership oldPartitionOwnership = convertOwnership(key, oldState.get(key)); + + String oldETag = oldPartitionOwnership.getETag(); + String reqETag = requestedPartitionOwnership.getETag(); + if (StringUtils.isNotEmpty(oldETag) && !oldETag.equals(reqETag)) { + debug("claimOwnership() -> Already claimed {}", ownershipToString(oldPartitionOwnership)); + continue; + } + } + + String newETag = CoreUtils.randomUuid().toString(); + + PartitionOwnership partitionOwnership = new PartitionOwnership() + .setFullyQualifiedNamespace(requestedPartitionOwnership.getFullyQualifiedNamespace()) + .setEventHubName(requestedPartitionOwnership.getEventHubName()) + .setConsumerGroup(requestedPartitionOwnership.getConsumerGroup()) + .setPartitionId(requestedPartitionOwnership.getPartitionId()) + .setOwnerId(requestedPartitionOwnership.getOwnerId()) + .setLastModifiedTime(timestamp) + .setETag(newETag); + + claimedOwnerships.add(partitionOwnership); + + newMap.put(key, createOwnershipValue(partitionOwnership)); + + debug("claimOwnership() -> Claiming {}", ownershipToString(partitionOwnership)); + } + + if (claimedOwnerships.isEmpty()) { + return Flux.empty(); + } + + return updateState(oldState, newMap) + .thenMany(Flux.fromIterable(claimedOwnerships)); + }) + .doOnNext(partitionOwnership -> debug("claimOwnership() -> Returning {}", ownershipToString(partitionOwnership))) + .doOnComplete(() -> debug("claimOwnership() -> Succeeded")) + .retryWhen(createRetrySpec("claimOwnership")) + .doOnError(throwable -> debug("claimOwnership() -> Failed: {}", throwable.getMessage())); + } + + @Override + public Flux listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) { + return getState() + .doFirst(() -> debug("listCheckpoints() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup)) + .flatMapMany(state -> { + checkDisconnectedNode(state); + + return getCheckpoints(state); + }) + .filter(checkpoint -> + checkpoint.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace) + && checkpoint.getEventHubName().equalsIgnoreCase(eventHubName) + && checkpoint.getConsumerGroup().equalsIgnoreCase(consumerGroup) + ) + .doOnNext(checkpoint -> debug("listCheckpoints() -> Returning {}", checkpointToString(checkpoint))) + .doOnComplete(() -> debug("listCheckpoints() -> Succeeded")) + .doOnError(throwable -> debug("listCheckpoints() -> Failed: {}", throwable.getMessage())); + } + + @Override + public Mono updateCheckpoint(Checkpoint checkpoint) { + return getState() + .doFirst(() -> debug("updateCheckpoint() -> Entering [{}]", checkpointToString(checkpoint))) + .flatMap(oldState -> { + checkDisconnectedNode(oldState); + + Map newMap = new HashMap<>(oldState.toMap()); + + newMap.put(createCheckpointKey(checkpoint), createCheckpointValue(checkpoint)); + + return updateState(oldState, newMap); + }) + .doOnSuccess(__ -> debug("updateCheckpoint() -> Succeeded")) + .retryWhen(createRetrySpec("updateCheckpoint")) + .doOnError(throwable -> debug("updateCheckpoint() -> Failed: {}", throwable.getMessage())); + } + + private Retry createRetrySpec(String methodName) { + return Retry.max(10) + .filter(t -> t instanceof ConcurrentStateModificationException) + .doBeforeRetry(retrySignal -> debug(methodName + "() -> Retry: {}", retrySignal)) + .onRetryExhaustedThrow((retrySpec, retrySignal) -> new ConcurrentStateModificationException( + String.format("Retrials of concurrent state modifications has been exhausted (%d retrials)", 10))); + } + + private Flux getOwnerships(StateMap state) { + return getEntries(state, OWNERSHIP.keyPrefix(), ComponentStateCheckpointStoreUtils::convertOwnership); + } + + private Flux getCheckpoints(StateMap state) { + return getEntries(state, CHECKPOINT.keyPrefix(), ComponentStateCheckpointStoreUtils::convertCheckpoint); + } + + private Flux getEntries(StateMap state, String kind, BiFunction converter) throws ProcessException { + return state.toMap().entrySet().stream() + .filter(e -> e.getKey().startsWith(kind)) + .map(e -> converter.apply(e.getKey(), e.getValue())) + .collect(collectingAndThen(toList(), Flux::fromIterable)); + } + + private void checkDisconnectedNode(StateMap state) { + // if _isClustered key is available in the state (that is the local cache is accessed via cluster scope) and it is true, then it is a disconnected cluster node + boolean disconnectedNode = Boolean.parseBoolean(state.get(CLUSTERED.key())); + + if (disconnectedNode) { + throw new ClusterNodeDisconnectedException("The node has been disconnected from the cluster, the checkpoint store is not accessible"); + } + } + + private void debug(String message, Object... arguments) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[clientId={}] " + message, ArrayUtils.addFirst(arguments, clientId)); + } + } + + private Mono getState() { + return Mono.defer( + () -> { + try { + StateMap state = stateManager.getState(Scope.CLUSTER); + return Mono.just(state); + } catch (Exception e) { + return Mono.error(new StateNotAvailableException(e)); + } + } + ); + } + + private Mono updateState(StateMap oldState, Map newMap) { + return Mono.defer( + () -> { + try { + boolean success = stateManager.replace(oldState, newMap, Scope.CLUSTER); + if (success) { + return Mono.empty(); + } else { + return Mono.error(new ConcurrentStateModificationException( + String.format("Component state with version [%s] has been modified by another instance" , oldState.getStateVersion().orElse("new")))); + } + } catch (Exception e) { + return Mono.error(new StateNotAvailableException(e)); + } + } + ); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtils.java new file mode 100644 index 0000000000..0960ab62c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtils.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionContext; +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix.CHECKPOINT; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix.OWNERSHIP; + +final class ComponentStateCheckpointStoreUtils { + + private ComponentStateCheckpointStoreUtils() { + } + + static PartitionOwnership convertOwnership(String key, String value) { + PartitionContext context = convertPartitionContext(key); + + final String[] parts = value.split("/", 3); + if (parts.length != 3) { + throw new ProcessException(String.format("Invalid ownership value: %s", value)); + } + + return new PartitionOwnership() + .setFullyQualifiedNamespace(context.getFullyQualifiedNamespace()) + .setEventHubName(context.getEventHubName()) + .setConsumerGroup(context.getConsumerGroup()) + .setPartitionId(context.getPartitionId()) + .setOwnerId(parts[0]) + .setLastModifiedTime(Long.parseLong(parts[1])) + .setETag(parts[2]); + } + + static Checkpoint convertCheckpoint(String key, String value) { + PartitionContext context = convertPartitionContext(key); + + final String[] parts = value.split("/", 2); + if (parts.length != 2) { + throw new ProcessException(String.format("Invalid checkpoint value: %s", value)); + } + + return new Checkpoint() + .setFullyQualifiedNamespace(context.getFullyQualifiedNamespace()) + .setEventHubName(context.getEventHubName()) + .setConsumerGroup(context.getConsumerGroup()) + .setPartitionId(context.getPartitionId()) + .setOffset(StringUtils.isNotEmpty(parts[0]) ? Long.parseLong(parts[0]) : null) + .setSequenceNumber(StringUtils.isNotEmpty(parts[1]) ? Long.parseLong(parts[1]): null); + } + + static PartitionContext convertPartitionContext(String key) { + final String[] parts = key.split("/", 5); + if (parts.length != 5) { + throw new ProcessException(String.format("Invalid entry key: %s", key)); + } + + final String fullyQualifiedNamespace = parts[1]; + final String eventHubName = parts[2]; + final String consumerGroup = parts[3]; + final String partitionId = parts[4]; + + return new PartitionContext( + fullyQualifiedNamespace, + eventHubName, + consumerGroup, + partitionId + ); + } + + static String createOwnershipKey(PartitionOwnership partitionOwnership) { + return createKey( + OWNERSHIP.keyPrefix(), + partitionOwnership.getFullyQualifiedNamespace(), + partitionOwnership.getEventHubName(), + partitionOwnership.getConsumerGroup(), + partitionOwnership.getPartitionId() + ); + } + + static String createCheckpointKey(Checkpoint checkpoint) { + return createKey( + CHECKPOINT.keyPrefix(), + checkpoint.getFullyQualifiedNamespace(), + checkpoint.getEventHubName(), + checkpoint.getConsumerGroup(), + checkpoint.getPartitionId() + ); + } + + private static String createKey(String kind, String fullyQualifiedNamespace, String eventHubName, String consumerGroup, String partitionId) { + return String.format( + "%s/%s/%s/%s/%s", + kind, + fullyQualifiedNamespace, + eventHubName, + consumerGroup, + partitionId + ); + } + + static String createOwnershipValue(PartitionOwnership partitionOwnership) { + return String.format("%s/%s/%s", + partitionOwnership.getOwnerId(), + partitionOwnership.getLastModifiedTime(), + partitionOwnership.getETag()); + } + + static String createCheckpointValue(Checkpoint checkpoint) { + return String.format("%s/%s", + checkpoint.getOffset() != null ? checkpoint.getOffset().toString() : "", + checkpoint.getSequenceNumber() != null ? checkpoint.getSequenceNumber().toString() : ""); + } + + static String ownershipToString(PartitionOwnership partitionOwnership) { + return "PartitionOwnership{" + + "fullyQualifiedNamespace='" + partitionOwnership.getFullyQualifiedNamespace() + '\'' + + ", eventHubName='" + partitionOwnership.getEventHubName() + '\'' + + ", consumerGroup='" + partitionOwnership.getConsumerGroup() + '\'' + + ", partitionId='" + partitionOwnership.getPartitionId() + '\'' + + ", ownerId='" + partitionOwnership.getOwnerId() + '\'' + + ", lastModifiedTime=" + partitionOwnership.getLastModifiedTime() + + ", eTag='" + partitionOwnership.getETag() + '\'' + + '}'; + } + + static List ownershipListToString(List partitionOwnershipList) { + return partitionOwnershipList.stream() + .map(ComponentStateCheckpointStoreUtils::ownershipToString) + .collect(Collectors.toList()); + } + + static String checkpointToString(Checkpoint checkpoint) { + return "Checkpoint{" + + "fullyQualifiedNamespace='" + checkpoint.getFullyQualifiedNamespace() + '\'' + + ", eventHubName='" + checkpoint.getEventHubName() + '\'' + + ", consumerGroup='" + checkpoint.getConsumerGroup() + '\'' + + ", partitionId='" + checkpoint.getPartitionId() + '\'' + + ", offset=" + checkpoint.getOffset() + + ", sequenceNumber=" + checkpoint.getSequenceNumber() + + '}'; + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ClusterNodeDisconnectedException.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ClusterNodeDisconnectedException.java new file mode 100644 index 0000000000..374b2d2c47 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ClusterNodeDisconnectedException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint.exception; + +public class ClusterNodeDisconnectedException extends ComponentStateCheckpointStoreException { + + public ClusterNodeDisconnectedException(String message) { + super(message); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ComponentStateCheckpointStoreException.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ComponentStateCheckpointStoreException.java new file mode 100644 index 0000000000..56c5ce1379 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ComponentStateCheckpointStoreException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint.exception; + +public class ComponentStateCheckpointStoreException extends RuntimeException { + + public ComponentStateCheckpointStoreException(String message) { + super(message); + } + + public ComponentStateCheckpointStoreException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ConcurrentStateModificationException.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ConcurrentStateModificationException.java new file mode 100644 index 0000000000..efdc9aff35 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/ConcurrentStateModificationException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint.exception; + +public class ConcurrentStateModificationException extends ComponentStateCheckpointStoreException { + + public ConcurrentStateModificationException(String message) { + super(message); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/StateNotAvailableException.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/StateNotAvailableException.java new file mode 100644 index 0000000000..434ba6ed1c --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/exception/StateNotAvailableException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint.exception; + +public class StateNotAvailableException extends ComponentStateCheckpointStoreException { + + public StateNotAvailableException(Throwable cause) { + super("Failure when reading/writing the component state", cause); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index 3268d7ccf4..3ab13ee1d4 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -24,6 +24,7 @@ import com.azure.messaging.eventhubs.models.EventBatchContext; import com.azure.messaging.eventhubs.models.PartitionContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStrategy; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.proxy.ProxyConfiguration; @@ -223,6 +224,19 @@ public class TestConsumeAzureEventHub { testRunner.assertValid(); } + @Test + public void testProcessorConfigValidityWithComponentStateCheckpointStrategy() throws InitializationException { + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, POLICY_NAME); + testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, POLICY_KEY); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeAzureEventHub.CHECKPOINT_STRATEGY, CheckpointStrategy.COMPONENT_STATE.getValue()); + testRunner.assertValid(); + } + @Test public void testReceiveOne() { setProperties(); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractCheckpointStoreTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractCheckpointStoreTest.java new file mode 100644 index 0000000000..97eef55ef4 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractCheckpointStoreTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +abstract class AbstractCheckpointStoreTest { + + static final String EVENT_HUB_NAMESPACE = "my-event-hub-namespace"; + static final String EVENT_HUB_NAME = "my-event-hub-name"; + static final String CONSUMER_GROUP = "my-consumer-group"; + + static final String PARTITION_ID_1 = "1"; + static final String PARTITION_ID_2 = "2"; + + static final String CLIENT_ID_1 = "client-id-1"; + static final String CLIENT_ID_2 = "client-id-2"; + + static final Long LAST_MODIFIED_TIME = 1234567890L; + static final String ETAG = "my-etag"; + + static final Long OFFSET = 10L; + static final Long SEQUENCE_NUMBER = 1L; + + PartitionOwnership partitionOwnership1; + PartitionOwnership partitionOwnership2; + + Checkpoint checkpoint1; + Checkpoint checkpoint2; + + @BeforeEach + void initTestData() { + partitionOwnership1 = createPartitionOwnership(PARTITION_ID_1, CLIENT_ID_1); + partitionOwnership2 = createPartitionOwnership(PARTITION_ID_2, CLIENT_ID_2); + + checkpoint1 = createCheckpoint(PARTITION_ID_1, OFFSET, SEQUENCE_NUMBER); + checkpoint2 = createCheckpoint(PARTITION_ID_2, OFFSET, SEQUENCE_NUMBER); + } + + PartitionOwnership createPartitionOwnership(String partitionId, String ownerId) { + return createPartitionOwnership( + EVENT_HUB_NAMESPACE, + EVENT_HUB_NAME, + CONSUMER_GROUP, + partitionId, + ownerId + ); + } + + PartitionOwnership createPartitionOwnership( + String fullyQualifiedNamespace, + String eventHubName, + String consumerGroup, + String partitionId, + String ownerId) { + return new TestablePartitionOwnership() + .setFullyQualifiedNamespace(fullyQualifiedNamespace) + .setEventHubName(eventHubName) + .setConsumerGroup(consumerGroup) + .setPartitionId(partitionId) + .setOwnerId(ownerId) + .setLastModifiedTime(null) + .setETag(null); + } + + Checkpoint createCheckpoint(String partitionId, Long offset, Long sequenceNumber) { + return createCheckpoint( + EVENT_HUB_NAMESPACE, + EVENT_HUB_NAME, + CONSUMER_GROUP, + partitionId, + offset, + sequenceNumber + ); + } + + Checkpoint createCheckpoint( + String fullyQualifiedNamespace, + String eventHubName, + String consumerGroup, + String partitionId, + Long offset, + Long sequenceNumber) { + return new TestableCheckpoint() + .setFullyQualifiedNamespace(fullyQualifiedNamespace) + .setEventHubName(eventHubName) + .setConsumerGroup(consumerGroup) + .setPartitionId(partitionId) + .setOffset(offset) + .setSequenceNumber(sequenceNumber); + } + + PartitionOwnership copy(PartitionOwnership original) { + return convertToTestable(original); + } + + Checkpoint copy(Checkpoint original) { + return convertToTestable(original); + } + + PartitionOwnership convertToTestable(PartitionOwnership original) { + return new TestablePartitionOwnership() + .setFullyQualifiedNamespace(original.getFullyQualifiedNamespace()) + .setEventHubName(original.getEventHubName()) + .setConsumerGroup(original.getConsumerGroup()) + .setPartitionId(original.getPartitionId()) + .setOwnerId(original.getOwnerId()) + .setLastModifiedTime(original.getLastModifiedTime()) + .setETag(original.getETag()); + } + + Checkpoint convertToTestable(Checkpoint original) { + return new TestableCheckpoint() + .setFullyQualifiedNamespace(original.getFullyQualifiedNamespace()) + .setEventHubName(original.getEventHubName()) + .setConsumerGroup(original.getConsumerGroup()) + .setPartitionId(original.getPartitionId()) + .setOffset(original.getOffset()) + .setSequenceNumber(original.getSequenceNumber()); + } + + List convertToTestablePartitionOwnerships(List partitionOwnerships) { + return partitionOwnerships.stream() + .map(this::convertToTestable) + .collect(Collectors.toList()); + } + + List convertToTestableCheckpoints(List checkpoints) { + return checkpoints.stream() + .map(this::convertToTestable) + .collect(Collectors.toList()); + } + + static class TestablePartitionOwnership extends PartitionOwnership { + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestablePartitionOwnership that = (TestablePartitionOwnership) o; + return Objects.equals(getFullyQualifiedNamespace(), that.getFullyQualifiedNamespace()) + && Objects.equals(getEventHubName(), that.getEventHubName()) + && Objects.equals(getConsumerGroup(), that.getConsumerGroup()) + && Objects.equals(getPartitionId(), that.getPartitionId()) + && Objects.equals(getOwnerId(), that.getOwnerId()) + && Objects.equals(getLastModifiedTime(), that.getLastModifiedTime()) + && Objects.equals(getETag(), that.getETag()); + } + + @Override + public int hashCode() { + return Objects.hash( + getFullyQualifiedNamespace(), + getEventHubName(), + getConsumerGroup(), + getPartitionId(), + getOwnerId(), + getLastModifiedTime(), + getETag() + ); + } + } + + static class TestableCheckpoint extends Checkpoint { + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestableCheckpoint that = (TestableCheckpoint) o; + return Objects.equals(getFullyQualifiedNamespace(), that.getFullyQualifiedNamespace()) + && Objects.equals(getEventHubName(), that.getEventHubName()) + && Objects.equals(getConsumerGroup(), that.getConsumerGroup()) + && Objects.equals(getPartitionId(), that.getPartitionId()) + && Objects.equals(getOffset(), that.getOffset()) + && Objects.equals(getSequenceNumber(), that.getSequenceNumber()); + } + + @Override + public int hashCode() { + return Objects.hash( + getFullyQualifiedNamespace(), + getEventHubName(), + getConsumerGroup(), + getPartitionId(), + getOffset(), + getSequenceNumber() + ); + } + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractComponentStateCheckpointStoreTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractComponentStateCheckpointStoreTest.java new file mode 100644 index 0000000000..9341cca72f --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/AbstractComponentStateCheckpointStoreTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import com.azure.core.util.CoreUtils; +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import org.apache.nifi.components.state.StateManager; +import org.junit.jupiter.api.BeforeEach; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +abstract class AbstractComponentStateCheckpointStoreTest extends AbstractCheckpointStoreTest { + + ComponentStateCheckpointStore checkpointStore; + + @BeforeEach + void initCheckpointStore() { + checkpointStore = new ComponentStateCheckpointStore(CLIENT_ID_1, getStateManager()); + } + + abstract StateManager getStateManager(); + + PartitionOwnership setETagAndLastModified(PartitionOwnership partitionOwnership) { + return partitionOwnership.setETag(CoreUtils.randomUuid().toString()) + .setLastModifiedTime(System.currentTimeMillis() - 1000); + } + + void assertClaimedOwnership(PartitionOwnership requestedOwnership, PartitionOwnership claimedOwnership) { + assertEquals(requestedOwnership.getFullyQualifiedNamespace(), claimedOwnership.getFullyQualifiedNamespace()); + assertEquals(requestedOwnership.getEventHubName(), claimedOwnership.getEventHubName()); + assertEquals(requestedOwnership.getConsumerGroup(), claimedOwnership.getConsumerGroup()); + assertEquals(requestedOwnership.getPartitionId(), claimedOwnership.getPartitionId()); + + assertEquals(requestedOwnership.getOwnerId(), claimedOwnership.getOwnerId()); + + assertNotNull(claimedOwnership.getLastModifiedTime()); + assertThat(claimedOwnership.getLastModifiedTime(), greaterThan(requestedOwnership.getLastModifiedTime() != null ? requestedOwnership.getLastModifiedTime() : 0)); + + assertNotNull(claimedOwnership.getETag()); + assertNotEquals(requestedOwnership.getETag(), claimedOwnership.getETag()); + } + + Map initMap(PartitionOwnership... partitionOwnerships) { + return Stream.of(partitionOwnerships) + .map(this::copy) + .map(this::setETagAndLastModified) + .collect(Collectors.toMap(ComponentStateCheckpointStoreUtils::createOwnershipKey, ComponentStateCheckpointStoreUtils::createOwnershipValue)); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreConcurrencyTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreConcurrencyTest.java new file mode 100644 index 0000000000..f10aa761c4 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreConcurrencyTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.state.MockStateMap; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointKey; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointValue; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipKey; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ComponentStateCheckpointStoreConcurrencyTest extends AbstractComponentStateCheckpointStoreTest { + + @Mock(strictness = Mock.Strictness.WARN) + private StateManager stateManager; + + @Captor + private ArgumentCaptor> updatedMapCaptor; + + @Override + StateManager getStateManager() { + return stateManager; + } + + @Test + void testConcurrentClaimDifferentOwnerships() throws IOException { + StateMap state1 = new MockStateMap(initMap(), 1); + StateMap state2 = new MockStateMap(initMap(partitionOwnership1), 2); + + when(stateManager.getState(Scope.CLUSTER)) + .thenReturn(state1) + .thenReturn(state2); + + when(stateManager.replace(eq(state1), anyMap(), eq(Scope.CLUSTER))).thenReturn(false); + when(stateManager.replace(eq(state2), anyMap(), eq(Scope.CLUSTER))).thenReturn(true); + + List requestedOwnerships = Collections.singletonList(partitionOwnership2); + + List claimedOwnerships = new ArrayList<>(); + checkpointStore.claimOwnership(requestedOwnerships).subscribe(claimedOwnerships::add); + + assertThat(claimedOwnerships, hasSize(1)); + PartitionOwnership claimedOwnership = claimedOwnerships.get(0); + assertClaimedOwnership(partitionOwnership2, claimedOwnership); + + verify(stateManager, times(2)).getState(eq(Scope.CLUSTER)); + verify(stateManager, times(2)).replace(any(StateMap.class), updatedMapCaptor.capture(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + + Map updatedMap1 = updatedMapCaptor.getAllValues().get(0); + assertThat(updatedMap1.size(), is(equalTo(1))); + assertThat(updatedMap1, hasEntry(equalTo(createOwnershipKey(partitionOwnership2)), startsWith(partitionOwnership2.getOwnerId()))); + + Map updatedMap2 = updatedMapCaptor.getAllValues().get(1); + assertThat(updatedMap2.size(), is(equalTo(2))); + assertThat(updatedMap2, hasEntry(equalTo(createOwnershipKey(partitionOwnership1)), startsWith(partitionOwnership1.getOwnerId()))); + assertThat(updatedMap2, hasEntry(equalTo(createOwnershipKey(partitionOwnership2)), startsWith(partitionOwnership2.getOwnerId()))); + } + + @Test + void testConcurrentClaimSameOwnership() throws IOException { + StateMap state1 = new MockStateMap(initMap(), 1); + StateMap state2 = new MockStateMap(initMap(partitionOwnership1), 2); + + when(stateManager.getState(Scope.CLUSTER)) + .thenReturn(state1) + .thenReturn(state2); + + when(stateManager.replace(eq(state1), anyMap(), eq(Scope.CLUSTER))).thenReturn(false); + + List requestedOwnerships = Collections.singletonList(partitionOwnership1); + + List claimedOwnerships = new ArrayList<>(); + checkpointStore.claimOwnership(requestedOwnerships).subscribe(claimedOwnerships::add); + + assertThat(claimedOwnerships, hasSize(0)); + + verify(stateManager, times(2)).getState(eq(Scope.CLUSTER)); + verify(stateManager, times(1)).replace(any(StateMap.class), updatedMapCaptor.capture(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + + Map updatedMap1 = updatedMapCaptor.getAllValues().get(0); + assertThat(updatedMap1.size(), is(equalTo(1))); + assertThat(updatedMap1, hasEntry(equalTo(createOwnershipKey(partitionOwnership1)), startsWith(partitionOwnership1.getOwnerId()))); + } + + @Test + void testConcurrentUpdateCheckpoint() throws IOException { + StateMap state1 = new MockStateMap(initMap(partitionOwnership1), 1); + StateMap state2 = new MockStateMap(initMap(partitionOwnership1, partitionOwnership2), 2); + + when(stateManager.getState(Scope.CLUSTER)) + .thenReturn(state1) + .thenReturn(state2); + + when(stateManager.replace(eq(state1), anyMap(), eq(Scope.CLUSTER))).thenReturn(false); + when(stateManager.replace(eq(state2), anyMap(), eq(Scope.CLUSTER))).thenReturn(true); + + checkpointStore.updateCheckpoint(checkpoint1).subscribe(); + + verify(stateManager, times(2)).getState(eq(Scope.CLUSTER)); + verify(stateManager, times(2)).replace(any(StateMap.class), updatedMapCaptor.capture(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + + Map updatedMap1 = updatedMapCaptor.getAllValues().get(0); + assertThat(updatedMap1.size(), is(equalTo(2))); + assertThat(updatedMap1, hasEntry(equalTo(createOwnershipKey(partitionOwnership1)), startsWith(partitionOwnership1.getOwnerId()))); + assertThat(updatedMap1, hasEntry(createCheckpointKey(checkpoint1), createCheckpointValue(checkpoint1))); + + Map updatedMap2 = updatedMapCaptor.getAllValues().get(1); + assertThat(updatedMap2.size(), is(equalTo(3))); + assertThat(updatedMap2, hasEntry(equalTo(createOwnershipKey(partitionOwnership1)), startsWith(partitionOwnership1.getOwnerId()))); + assertThat(updatedMap2, hasEntry(equalTo(createOwnershipKey(partitionOwnership2)), startsWith(partitionOwnership2.getOwnerId()))); + assertThat(updatedMap1, hasEntry(createCheckpointKey(checkpoint1), createCheckpointValue(checkpoint1))); + } + + @Test + void testConcurrentCleanUp() throws IOException { + StateMap state1 = new MockStateMap(initMap(partitionOwnership1), 1); + StateMap state2 = new MockStateMap(initMap(), 2); + + when(stateManager.getState(Scope.CLUSTER)) + .thenReturn(state1) + .thenReturn(state2); + + when(stateManager.replace(eq(state1), anyMap(), eq(Scope.CLUSTER))).thenReturn(false); + + checkpointStore.cleanUp(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2"); + + verify(stateManager, times(2)).getState(eq(Scope.CLUSTER)); + verify(stateManager, times(1)).replace(any(StateMap.class), updatedMapCaptor.capture(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + + Map updatedMap1 = updatedMapCaptor.getAllValues().get(0); + assertTrue(updatedMap1.isEmpty()); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreFailureTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreFailureTest.java new file mode 100644 index 0000000000..0812bbf18e --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreFailureTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ConcurrentStateModificationException; +import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.StateNotAvailableException; +import org.apache.nifi.state.MockStateMap; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; + +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ComponentStateCheckpointStoreFailureTest extends AbstractComponentStateCheckpointStoreTest { + + @Mock(strictness = Mock.Strictness.WARN) + private StateManager stateManager; + + @Override + StateManager getStateManager() { + return stateManager; + } + + @Test + void testListOwnership_GetState_IOException() throws IOException { + when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class); + + StepVerifier.create(checkpointStore.listOwnership(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP)) + .expectError(StateNotAvailableException.class) + .verify(); + + verify(stateManager).getState(Scope.CLUSTER); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testListCheckpoints_GetState_IOException() throws IOException { + when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class); + + StepVerifier.create(checkpointStore.listCheckpoints(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP)) + .expectError(StateNotAvailableException.class) + .verify(); + + verify(stateManager).getState(Scope.CLUSTER); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testClaimOwnership_GetState_IOException() throws IOException { + when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class); + + StepVerifier.create(checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1))) + .expectError(StateNotAvailableException.class) + .verify(); + + verify(stateManager, times(1)).getState(Scope.CLUSTER); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testClaimOwnership_ReplaceState_IOException() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenThrow(IOException.class); + + StepVerifier.create(checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1))) + .expectError(StateNotAvailableException.class) + .verify(); + + verify(stateManager).getState(Scope.CLUSTER); + verify(stateManager).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testUpdateCheckpoint_GetState_IOException() throws IOException { + when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class); + + StepVerifier.create(checkpointStore.updateCheckpoint(checkpoint1)) + .expectError(StateNotAvailableException.class) + .verify(); + + verify(stateManager, times(1)).getState(Scope.CLUSTER); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testUpdateCheckpoint_ReplaceState_IOException() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenThrow(IOException.class); + + StepVerifier.create(checkpointStore.updateCheckpoint(checkpoint1)) + .expectError(StateNotAvailableException.class) + .verify(); + + verify(stateManager, times(1)).getState(Scope.CLUSTER); + verify(stateManager).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testCleanUp_GetState_IOException() throws IOException { + when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class); + + StepVerifier.create(checkpointStore.cleanUpMono(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2")) + .expectError(StateNotAvailableException.class) + .verify(); + + verify(stateManager).getState(Scope.CLUSTER); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testCleanUp_ReplaceState_IOException() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(initMap(partitionOwnership1)), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenThrow(IOException.class); + + StepVerifier.create(checkpointStore.cleanUpMono(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2")) + .expectError(StateNotAvailableException.class) + .verify(); + + verify(stateManager, times(1)).getState(Scope.CLUSTER); + verify(stateManager).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testClaimOwnership_ReplaceState_ConcurrentStateModificationException_Success() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false, false, true); + + StepVerifier.withVirtualTime(() -> checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1))) + .thenAwait(Duration.ofSeconds(1)) + .expectNextCount(1) + .verifyComplete(); + + verify(stateManager, times(3)).getState(Scope.CLUSTER); + verify(stateManager, times(3)).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testClaimOwnership_ReplaceState_ConcurrentStateModificationException_Failure() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false); + + StepVerifier.withVirtualTime(() -> checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1))) + .thenAwait(Duration.ofSeconds(10)) + .expectError(ConcurrentStateModificationException.class) + .verify(); + + verify(stateManager, times(11)).getState(Scope.CLUSTER); + verify(stateManager, times(11)).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testUpdateCheckpoint_ReplaceState_ConcurrentStateModificationException_Success() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false, false, true); + + StepVerifier.withVirtualTime(() -> checkpointStore.updateCheckpoint(checkpoint1)) + .thenAwait(Duration.ofSeconds(1)) + .expectNext() + .verifyComplete(); + + verify(stateManager, times(3)).getState(Scope.CLUSTER); + verify(stateManager, times(3)).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testUpdateCheckpoint_ReplaceState_ConcurrentStateModificationException_Failure() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false); + + StepVerifier.withVirtualTime(() -> checkpointStore.updateCheckpoint(checkpoint1)) + .thenAwait(Duration.ofSeconds(10)) + .expectError(ConcurrentStateModificationException.class) + .verify(); + + verify(stateManager, times(11)).getState(Scope.CLUSTER); + verify(stateManager, times(11)).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testCleanUp_ReplaceState_ConcurrentStateModificationException_Success() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(initMap(partitionOwnership1)), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false, false, true); + + StepVerifier.withVirtualTime(() -> checkpointStore.cleanUpMono(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2")) + .thenAwait(Duration.ofSeconds(1)) + .expectNext() + .verifyComplete(); + + verify(stateManager, times(3)).getState(Scope.CLUSTER); + verify(stateManager, times(3)).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + + @Test + void testCleanUp_ReplaceState_ConcurrentStateModificationException_Failure() throws IOException { + StateMap state = new MockStateMap(new HashMap<>(initMap(partitionOwnership1)), 1); + when(stateManager.getState(Scope.CLUSTER)).thenReturn(state); + when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false); + + StepVerifier.withVirtualTime(() -> checkpointStore.cleanUpMono(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2")) + .thenAwait(Duration.ofSeconds(10)) + .expectError(ConcurrentStateModificationException.class) + .verify(); + + verify(stateManager, times(11)).getState(Scope.CLUSTER); + verify(stateManager, times(11)).replace(eq(state), anyMap(), eq(Scope.CLUSTER)); + verifyNoMoreInteractions(stateManager); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreTest.java new file mode 100644 index 0000000000..33e68ed141 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreTest.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ClusterNodeDisconnectedException; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.state.MockStateManager.ExecutionMode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLIENT_ID; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLUSTERED; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointKey; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointValue; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipKey; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ComponentStateCheckpointStoreTest extends AbstractComponentStateCheckpointStoreTest { + + private MockStateManager stateManager; + + @Override + StateManager getStateManager() { + stateManager = new MockStateManager(new Object()); + stateManager.setIgnoreAnnotations(true); + return stateManager; + } + + @BeforeEach + void beforeEach() throws IOException { + stateManager.setExecutionMode(ExecutionMode.CLUSTERED); + + addToState(CLIENT_ID.key(), CLIENT_ID_1, Scope.LOCAL); + addToState(CLUSTERED.key(), "true", Scope.LOCAL); + } + + @Test + void testOwnershipStoredInState() throws IOException { + checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1)).blockLast(); + + String expectedOwnershipKey = "ownership/my-event-hub-namespace/my-event-hub-name/my-consumer-group/1"; + stateManager.assertStateSet(expectedOwnershipKey, Scope.CLUSTER); + + String ownershipValue = stateManager.getState(Scope.CLUSTER).get(expectedOwnershipKey); + assertTrue(ownershipValue.matches("client-id-1/\\d+/.+")); + } + + @Test + void testCheckpointStoredInState() { + checkpointStore.updateCheckpoint(checkpoint1).block(); + + String expectedCheckpointKey = "checkpoint/my-event-hub-namespace/my-event-hub-name/my-consumer-group/1"; + String expectedCheckpointValue = "10/1"; + stateManager.assertStateEquals(expectedCheckpointKey, expectedCheckpointValue, Scope.CLUSTER); + } + + @Test + void testListing() throws IOException { + initStateWithAllItems(); + + testListOwnerships(partitionOwnership1, partitionOwnership2); + testListCheckpoints(checkpoint1, checkpoint2); + } + + @ParameterizedTest + @EnumSource(ExecutionMode.class) + void testCleanUp(ExecutionMode executionMode) throws IOException { + setExecutionMode(executionMode); + + initStateWithAllItems(); + + int expectedBefore = executionMode == ExecutionMode.CLUSTERED ? 10 : 12; + assertEquals(expectedBefore, stateManager.getState(Scope.CLUSTER).toMap().size()); + + checkpointStore.cleanUp(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP); + + int expectedAfter = executionMode == ExecutionMode.CLUSTERED ? 4 : 6; + assertEquals(expectedAfter, stateManager.getState(Scope.CLUSTER).toMap().size()); + + testListOwnerships(partitionOwnership1, partitionOwnership2); + testListCheckpoints(checkpoint1, checkpoint2); + + assertLocalScope(executionMode); + } + + @ParameterizedTest + @EnumSource(ExecutionMode.class) + void testClaimOwnership(ExecutionMode executionMode) throws IOException { + setExecutionMode(executionMode); + + addToState(partitionOwnership1); + + addToState(checkpoint1); + + List requestedOwnerships = Collections.singletonList(partitionOwnership2); + + List claimedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block(); + + assertNotNull(claimedOwnerships); + assertEquals(1, claimedOwnerships.size()); + + PartitionOwnership claimedOwnership = claimedOwnerships.get(0); + assertClaimedOwnership(partitionOwnership2, claimedOwnership); + + assertStoredOwnerships(partitionOwnership1, convertToTestable(claimedOwnership)); + assertStoredCheckpoints(checkpoint1); + + assertLocalScope(executionMode); + } + + @Test + void testRenewOwnership() throws IOException { + addToState(partitionOwnership1); + addToState(partitionOwnership2); + + addToState(checkpoint1); + addToState(checkpoint2); + + List requestedOwnerships = Collections.singletonList(partitionOwnership2); + + List claimedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block(); + + assertNotNull(claimedOwnerships); + assertEquals(1, claimedOwnerships.size()); + + PartitionOwnership claimedOwnership = claimedOwnerships.get(0); + assertClaimedOwnership(partitionOwnership2, claimedOwnership); + + assertStoredOwnerships(partitionOwnership1, convertToTestable(claimedOwnership)); + assertStoredCheckpoints(checkpoint1, checkpoint2); + } + + @Test + void testStealOwnership() throws IOException { + addToState(partitionOwnership1); + + addToState(checkpoint1); + + PartitionOwnership newOwnership = copy(partitionOwnership1) + .setOwnerId(CLIENT_ID_2); + + List requestedOwnerships = Collections.singletonList(newOwnership); + + List claimedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block(); + + assertNotNull(claimedOwnerships); + assertEquals(1, claimedOwnerships.size()); + + PartitionOwnership claimedOwnership = claimedOwnerships.get(0); + assertClaimedOwnership(newOwnership, claimedOwnership); + + assertStoredOwnerships(convertToTestable(claimedOwnership)); + assertStoredCheckpoints(checkpoint1); + } + + @Test + void testClaimMultipleOwnerships() { + partitionOwnership2.setOwnerId(CLIENT_ID_1); + + List requestedOwnerships = Arrays.asList(partitionOwnership1, partitionOwnership2); + + List claimedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block(); + + assertNotNull(claimedOwnerships); + assertEquals(2, claimedOwnerships.size()); + + PartitionOwnership claimedOwnership1; + PartitionOwnership claimedOwnership2; + + if (claimedOwnerships.get(0).getPartitionId().equals(PARTITION_ID_1)) { + claimedOwnership1 = claimedOwnerships.get(0); + claimedOwnership2 = claimedOwnerships.get(1); + } else { + claimedOwnership1 = claimedOwnerships.get(1); + claimedOwnership2 = claimedOwnerships.get(2); + } + + assertClaimedOwnership(partitionOwnership1, claimedOwnership1); + assertClaimedOwnership(partitionOwnership2, claimedOwnership2); + + assertStoredOwnerships(convertToTestable(claimedOwnership1), convertToTestable(claimedOwnership2)); + } + + @Test + void testClaimOwnershipUnsuccessful() { + List requestedOwnershipsA = Collections.singletonList(partitionOwnership1); + List requestedOwnershipsB = Collections.singletonList(partitionOwnership1); + + List claimedOwnershipsA = checkpointStore.claimOwnership(requestedOwnershipsA).collectList().block(); + + List claimedOwnershipsB = checkpointStore.claimOwnership(requestedOwnershipsB).collectList().block(); + + assertNotNull(claimedOwnershipsB); + assertEquals(0, claimedOwnershipsB.size()); + + assertStoredOwnerships(convertToTestable(claimedOwnershipsA.get(0))); + } + + @Test + void testClaimMultipleOwnershipsPartialSuccess() { + List requestedOwnershipsA = Collections.singletonList(partitionOwnership1); + List requestedOwnershipsB = Arrays.asList(partitionOwnership1, partitionOwnership2); + + List claimedOwnershipsA = checkpointStore.claimOwnership(requestedOwnershipsA).collectList().block(); + + List claimedOwnershipsB = checkpointStore.claimOwnership(requestedOwnershipsB).collectList().block(); + + assertNotNull(claimedOwnershipsB); + assertEquals(1, claimedOwnershipsB.size()); + + assertStoredOwnerships(convertToTestable(claimedOwnershipsA.get(0)), convertToTestable(claimedOwnershipsB.get(0))); + } + + @Test + void testUnclaimOwnership() throws IOException { + addToState(partitionOwnership1); + + addToState(checkpoint1); + + PartitionOwnership ownershipToUnclaim = copy(partitionOwnership1) + .setOwnerId(""); + + List requestedOwnerships = Collections.singletonList(ownershipToUnclaim); + + List returnedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block(); + + assertNotNull(returnedOwnerships); + assertEquals(1, returnedOwnerships.size()); + + PartitionOwnership unclaimedOwnership = returnedOwnerships.get(0); + assertClaimedOwnership(ownershipToUnclaim, unclaimedOwnership); + + assertStoredOwnerships(convertToTestable(unclaimedOwnership)); + assertStoredCheckpoints(checkpoint1); + } + + @ParameterizedTest + @EnumSource(ExecutionMode.class) + void testNewCheckpoint(ExecutionMode executionMode) throws IOException { + setExecutionMode(executionMode); + + addToState(partitionOwnership1); + addToState(partitionOwnership2); + + addToState(checkpoint1); + + Checkpoint newCheckpoint = createCheckpoint(PARTITION_ID_2, 20L, 2L); + + checkpointStore.updateCheckpoint(newCheckpoint).block(); + + assertStoredCheckpoints(checkpoint1, newCheckpoint); + assertStoredOwnerships(partitionOwnership1, partitionOwnership2); + + assertLocalScope(executionMode); + } + + @Test + void testUpdatedCheckpoint() throws IOException { + addToState(partitionOwnership1); + addToState(partitionOwnership2); + + addToState(checkpoint1); + addToState(checkpoint2); + + Checkpoint updatedCheckpoint = copy(checkpoint2) + .setOffset(20L) + .setSequenceNumber(2L); + + checkpointStore.updateCheckpoint(updatedCheckpoint).block(); + + assertStoredCheckpoints(checkpoint1, updatedCheckpoint); + assertStoredOwnerships(partitionOwnership1, partitionOwnership2); + } + + @Test + void testCheckpointWithNullOffset() { + checkpoint1.setOffset(null); + + checkpointStore.updateCheckpoint(checkpoint1).block(); + + assertStoredCheckpoints(checkpoint1); + } + + @Test + void testCheckpointWithNullSequenceNumber() { + checkpoint1.setSequenceNumber(null); + + checkpointStore.updateCheckpoint(checkpoint1).block(); + + assertStoredCheckpoints(checkpoint1); + } + + @Test + void testDisconnectedNode() { + stateManager.setExecutionMode(ExecutionMode.STANDALONE); + + assertThrows(ClusterNodeDisconnectedException.class, () -> checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1)).collectList().block()); + + assertThrows(ClusterNodeDisconnectedException.class, () -> checkpointStore.updateCheckpoint(checkpoint1).block()); + } + + private void setExecutionMode(ExecutionMode executionMode) throws IOException { + stateManager.setExecutionMode(executionMode); + addToState(CLUSTERED.key(), Boolean.toString(executionMode == ExecutionMode.CLUSTERED), Scope.LOCAL); + } + + private void testListOwnerships(PartitionOwnership... expectedPartitionOwnerships) { + List partitionOwnerships = checkpointStore.listOwnership(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP).collectList().block(); + + assertNotNull(partitionOwnerships); + assertEquals(expectedPartitionOwnerships.length, partitionOwnerships.size()); + assertThat(convertToTestablePartitionOwnerships(partitionOwnerships), containsInAnyOrder(expectedPartitionOwnerships)); + } + + private void testListCheckpoints(Checkpoint... expectedCheckpoints) { + List checkpoints = checkpointStore.listCheckpoints(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP).collectList().block(); + + assertNotNull(checkpoints); + assertEquals(expectedCheckpoints.length, checkpoints.size()); + assertThat(convertToTestableCheckpoints(checkpoints), containsInAnyOrder(expectedCheckpoints)); + } + + private void assertStoredOwnerships(PartitionOwnership... expectedPartitionOwnerships) { + testListOwnerships(expectedPartitionOwnerships); + } + + private void assertStoredCheckpoints(Checkpoint... expectedCheckpoints) { + testListCheckpoints(expectedCheckpoints); + } + + private void assertLocalScope(ExecutionMode executionMode) { + stateManager.assertStateEquals(CLIENT_ID.key(), CLIENT_ID_1, Scope.LOCAL); + stateManager.assertStateEquals(CLUSTERED.key(), Boolean.toString(executionMode == ExecutionMode.CLUSTERED), Scope.LOCAL); + } + + private void addToState(PartitionOwnership partitionOwnership) throws IOException { + setETagAndLastModified(partitionOwnership); + + addToState(createOwnershipKey(partitionOwnership), createOwnershipValue(partitionOwnership), Scope.CLUSTER); + } + + private void addToState(Checkpoint checkpoint) throws IOException { + addToState(createCheckpointKey(checkpoint), createCheckpointValue(checkpoint), Scope.CLUSTER); + } + + private void addToState(String key, String value, Scope scope) throws IOException { + Map map = new HashMap<>(stateManager.getState(scope).toMap()); + + map.put(key, value); + + stateManager.setState(map, scope); + } + + private void initStateWithAllItems() throws IOException { + addToState(partitionOwnership1); + addToState(partitionOwnership2); + + addToState(checkpoint1); + addToState(checkpoint2); + + addToState(createPartitionOwnership(EVENT_HUB_NAMESPACE + "-2", EVENT_HUB_NAME, CONSUMER_GROUP, PARTITION_ID_1, CLIENT_ID_1)); + addToState(createPartitionOwnership(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME + "-2", CONSUMER_GROUP, PARTITION_ID_1, CLIENT_ID_1)); + addToState(createPartitionOwnership(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2", PARTITION_ID_1, CLIENT_ID_1)); + + addToState(createCheckpoint(EVENT_HUB_NAMESPACE + "-2", EVENT_HUB_NAME, CONSUMER_GROUP, PARTITION_ID_1, OFFSET, SEQUENCE_NUMBER)); + addToState(createCheckpoint(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME + "-2", CONSUMER_GROUP, PARTITION_ID_1, OFFSET, SEQUENCE_NUMBER)); + addToState(createCheckpoint(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2", PARTITION_ID_1, OFFSET, SEQUENCE_NUMBER)); + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtilsTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtilsTest.java new file mode 100644 index 0000000000..5bfcf7ac34 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/checkpoint/ComponentStateCheckpointStoreUtilsTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.eventhub.checkpoint; + +import com.azure.messaging.eventhubs.models.Checkpoint; +import com.azure.messaging.eventhubs.models.PartitionContext; +import com.azure.messaging.eventhubs.models.PartitionOwnership; +import org.junit.jupiter.api.Test; + +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertCheckpoint; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertOwnership; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertPartitionContext; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointKey; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointValue; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipKey; +import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +class ComponentStateCheckpointStoreUtilsTest extends AbstractCheckpointStoreTest { + + private static final String OWNERSHIP_KEY = "ownership/my-event-hub-namespace/my-event-hub-name/my-consumer-group/1"; + private static final String OWNERSHIP_VALUE = "client-id-1/1234567890/my-etag"; + + private static final String CHECKPOINT_KEY = "checkpoint/my-event-hub-namespace/my-event-hub-name/my-consumer-group/1"; + private static final String CHECKPOINT_VALUE = "10/1"; + + @Test + void testConvertOwnership() { + PartitionOwnership partitionOwnership = convertOwnership(OWNERSHIP_KEY, OWNERSHIP_VALUE); + + assertNotNull(partitionOwnership); + + assertEquals(EVENT_HUB_NAMESPACE, partitionOwnership.getFullyQualifiedNamespace()); + assertEquals(EVENT_HUB_NAME, partitionOwnership.getEventHubName()); + assertEquals(CONSUMER_GROUP, partitionOwnership.getConsumerGroup()); + assertEquals(PARTITION_ID_1, partitionOwnership.getPartitionId()); + + assertEquals(CLIENT_ID_1, partitionOwnership.getOwnerId()); + assertEquals(LAST_MODIFIED_TIME, partitionOwnership.getLastModifiedTime()); + assertEquals(ETAG, partitionOwnership.getETag()); + } + + @Test + void testConvertCheckpoint() { + Checkpoint checkpoint = convertCheckpoint(CHECKPOINT_KEY, CHECKPOINT_VALUE); + + assertNotNull(checkpoint); + + assertEquals(EVENT_HUB_NAMESPACE, checkpoint.getFullyQualifiedNamespace()); + assertEquals(EVENT_HUB_NAME, checkpoint.getEventHubName()); + assertEquals(CONSUMER_GROUP, checkpoint.getConsumerGroup()); + assertEquals(PARTITION_ID_1, checkpoint.getPartitionId()); + + assertEquals(OFFSET, checkpoint.getOffset()); + assertEquals(SEQUENCE_NUMBER, checkpoint.getSequenceNumber()); + } + + @Test + void testConvertPartitionContextFromOwnershipKey() { + PartitionContext partitionContext = convertPartitionContext(OWNERSHIP_KEY); + + assertNotNull(partitionContext); + + assertEquals(EVENT_HUB_NAMESPACE, partitionContext.getFullyQualifiedNamespace()); + assertEquals(EVENT_HUB_NAME, partitionContext.getEventHubName()); + assertEquals(CONSUMER_GROUP, partitionContext.getConsumerGroup()); + assertEquals(PARTITION_ID_1, partitionContext.getPartitionId()); + } + + @Test + void testConvertPartitionContextFromCheckpointKey() { + PartitionContext partitionContext = convertPartitionContext(CHECKPOINT_KEY); + + assertNotNull(partitionContext); + + assertEquals(EVENT_HUB_NAMESPACE, partitionContext.getFullyQualifiedNamespace()); + assertEquals(EVENT_HUB_NAME, partitionContext.getEventHubName()); + assertEquals(CONSUMER_GROUP, partitionContext.getConsumerGroup()); + assertEquals(PARTITION_ID_1, partitionContext.getPartitionId()); + } + + @Test + void testOwnershipKey() { + String ownershipKey = createOwnershipKey(partitionOwnership1); + + assertEquals(OWNERSHIP_KEY, ownershipKey); + } + + @Test + void testOwnershipValue() { + partitionOwnership1 + .setLastModifiedTime(LAST_MODIFIED_TIME) + .setETag(ETAG); + + String ownershipValue = createOwnershipValue(partitionOwnership1); + + assertEquals(OWNERSHIP_VALUE, ownershipValue); + } + + @Test + void testCheckpointKey() { + String checkpointKey = createCheckpointKey(checkpoint1); + + assertEquals(CHECKPOINT_KEY, checkpointKey); + } + + @Test + void testCheckpointValue() { + String checkpointValue = createCheckpointValue(checkpoint1); + + assertEquals(CHECKPOINT_VALUE, checkpointValue); + } + +}