NIFI-11294 Support Component State Checkpoints in ConsumeAzureEventHub

This closes #8013

Signed-off-by: David Handermann <exceptionfactory@apache.org>
Co-authored-by: Peter Turcsanyi <turcsanyip@apache.org>
Co-authored-by: Malthe Borch <mborch@gmail.com>
This commit is contained in:
Peter Turcsanyi 2023-11-12 21:55:25 +01:00 committed by exceptionfactory
parent c719761a51
commit 2a3a7d9379
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
19 changed files with 2129 additions and 45 deletions

View File

@ -29,10 +29,20 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.nifi.state.MockStateManager.ExecutionMode.CLUSTERED;
import static org.apache.nifi.state.MockStateManager.ExecutionMode.STANDALONE;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MockStateManager implements StateManager {
public enum ExecutionMode {
// in CLUSTERED mode separate state maps are used for the CLUSTER and the LOCAL scopes
CLUSTERED,
// in STANDALONE mode the same state map (the local one) is used for both the CLUSTER and the LOCAL scopes
STANDALONE
}
private final AtomicInteger versionIndex = new AtomicInteger(0);
private StateMap localStateMap = new MockStateMap(null, -1L);
@ -50,6 +60,8 @@ public class MockStateManager implements StateManager {
private final boolean usesLocalState;
private final boolean usesClusterState;
private ExecutionMode executionMode = CLUSTERED;
public MockStateManager(final Object component) {
final Stateful stateful = component.getClass().getAnnotation(Stateful.class);
if (stateful == null) {
@ -78,13 +90,17 @@ public class MockStateManager implements StateManager {
localStateMap = new MockStateMap(null, -1L);
}
public void setExecutionMode(ExecutionMode executionMode) {
this.executionMode = executionMode;
}
@Override
public synchronized void setState(final Map<String, String> state, final Scope scope) throws IOException {
verifyAnnotation(scope);
verifyCanSet(scope);
final StateMap stateMap = new MockStateMap(state, versionIndex.incrementAndGet());
if (scope == Scope.CLUSTER) {
if (scope == Scope.CLUSTER && executionMode == CLUSTERED) {
clusterStateMap = stateMap;
} else {
localStateMap = stateMap;
@ -102,7 +118,7 @@ public class MockStateManager implements StateManager {
verifyAnnotation(scope);
if (scope == Scope.CLUSTER) {
clusterRetrievedCount.incrementAndGet();
return clusterStateMap;
return executionMode == CLUSTERED ? clusterStateMap : localStateMap;
} else {
localRetrievedCount.incrementAndGet();
return localStateMap;
@ -120,10 +136,14 @@ public class MockStateManager implements StateManager {
public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException {
verifyAnnotation(scope);
if (scope == Scope.CLUSTER) {
if (oldValue == clusterStateMap) {
if (executionMode == CLUSTERED && oldValue == clusterStateMap) {
verifyCanSet(scope);
clusterStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet());
return true;
} else if (executionMode == STANDALONE && oldValue == localStateMap) {
verifyCanSet(scope);
localStateMap = new MockStateMap(newValue, versionIndex.incrementAndGet());
return true;
}
return false;
@ -176,7 +196,7 @@ public class MockStateManager implements StateManager {
private String getValue(final String key, final Scope scope) {
final StateMap stateMap;
if (scope == Scope.CLUSTER) {
if (scope == Scope.CLUSTER && executionMode == CLUSTERED) {
stateMap = clusterStateMap;
} else {
stateMap = localStateMap;

View File

@ -199,6 +199,12 @@
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.5.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.azure.eventhub;
import com.azure.core.amqp.AmqpClientOptions;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
@ -24,6 +25,7 @@ import com.azure.core.http.ProxyOptions;
import com.azure.core.util.HttpClientOptions;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
@ -36,16 +38,20 @@ import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
@ -58,6 +64,9 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStrategy;
import org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStore;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ComponentStateCheckpointStoreException;
import org.apache.nifi.processors.azure.eventhub.position.EarliestEventPositionProvider;
import org.apache.nifi.processors.azure.eventhub.position.LegacyBlobStorageEventPositionProvider;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@ -81,15 +90,19 @@ import java.io.OutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLIENT_ID;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLUSTERED;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs with checkpointing to ensure consistent event processing. "
@ -98,6 +111,8 @@ import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
+ "In clustered environment, ConsumeAzureEventHub processor instances form a consumer group and the messages are distributed among the cluster nodes "
+ "(each message is processed on one cluster node only).")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Local state is used to store the client id. " +
"Cluster state is used to store partition ownership and checkpoint information when component state is configured as the checkpointing strategy.")
@TriggerSerially
@WritesAttributes({
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"),
@ -181,7 +196,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
static final PropertyDescriptor INITIAL_OFFSET = new PropertyDescriptor.Builder()
.name("event-hub-initial-offset")
.displayName("Initial Offset")
.description("Specify where to start receiving messages if offset is not stored in Azure Storage.")
.description("Specify where to start receiving messages if offset is not yet stored in the checkpoint store.")
.required(true)
.allowableValues(INITIAL_OFFSET_START_OF_STREAM, INITIAL_OFFSET_END_OF_STREAM)
.defaultValue(INITIAL_OFFSET_END_OF_STREAM)
@ -218,12 +233,23 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder()
.name("event-hub-message-receive-timeout")
.displayName("Message Receive Timeout")
.description("The amount of time this consumer should wait to receive the Prefetch Count before returning.")
.description("The amount of time this consumer should wait to receive the Batch Size before returning.")
.defaultValue("1 min")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(true)
.build();
static final PropertyDescriptor CHECKPOINT_STRATEGY = new PropertyDescriptor.Builder()
.name("checkpoint-strategy")
.displayName("Checkpoint Strategy")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.allowableValues(CheckpointStrategy.class)
.defaultValue(CheckpointStrategy.AZURE_BLOB_STORAGE.getValue())
.description("Specifies which strategy to use for storing and retrieving partition ownership and checkpoint information for each partition.")
.build();
static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
.name("storage-account-name")
.displayName("Storage Account Name")
@ -231,6 +257,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(true)
.dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE)
.build();
static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
.name("storage-account-key")
@ -240,6 +267,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
.dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE)
.build();
static final PropertyDescriptor STORAGE_SAS_TOKEN = new PropertyDescriptor.Builder()
.name("storage-sas-token")
@ -250,6 +278,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
"Token must start with a ? character."))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
.dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE)
.build();
static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder()
.name("storage-container-name")
@ -259,6 +288,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false)
.dependsOn(CHECKPOINT_STRATEGY, CheckpointStrategy.AZURE_BLOB_STORAGE)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -289,6 +319,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
PREFETCH_COUNT,
BATCH_SIZE,
RECEIVE_TIMEOUT,
CHECKPOINT_STRATEGY,
STORAGE_ACCOUNT_NAME,
STORAGE_ACCOUNT_KEY,
STORAGE_SAS_TOKEN,
@ -301,10 +332,10 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
private volatile RecordReaderFactory readerFactory;
private volatile RecordSetWriterFactory writerFactory;
private volatile String namespaceName;
private volatile boolean isRecordReaderSet = false;
private volatile boolean isRecordWriterSet = false;
private volatile String serviceBusEndpoint;
private volatile String clientId;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -328,6 +359,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
final ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService();
final String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue());
if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) {
results.add(new ValidationResult.Builder()
@ -338,24 +370,26 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
.build());
}
if (StringUtils.isBlank(storageAccountKey) && StringUtils.isBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(String.format("%s or %s",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.explanation(String.format("either %s or %s should be set.",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.valid(false)
.build());
}
if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
if (StringUtils.isBlank(storageAccountKey) && StringUtils.isBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(String.format("%s or %s",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.explanation(String.format("either %s or %s should be set.",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.valid(false)
.build());
}
if (StringUtils.isNotBlank(storageAccountKey) && StringUtils.isNotBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(String.format("%s or %s",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.explanation(String.format("%s and %s should not be set at the same time.",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.valid(false)
.build());
if (StringUtils.isNotBlank(storageAccountKey) && StringUtils.isNotBlank(storageSasToken)) {
results.add(new ValidationResult.Builder()
.subject(String.format("%s or %s",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.explanation(String.format("%s and %s should not be set at the same time.",
STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
.valid(false)
.build());
}
}
results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext));
return results;
@ -370,6 +404,24 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
}
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
StateManager stateManager = context.getStateManager();
String clientId = stateManager.getState(Scope.LOCAL).get(CLIENT_ID.key());
if (clientId == null) {
clientId = UUID.randomUUID().toString();
final Map<String, String> clientState = new HashMap<>();
clientState.put(CLIENT_ID.key(), clientId);
clientState.put(CLUSTERED.key(), Boolean.toString(getNodeTypeProvider().isClustered()));
stateManager.setState(clientState, Scope.LOCAL);
}
this.clientId = clientId;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
if (eventProcessorClient == null) {
@ -398,25 +450,42 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
processSessionFactory = null;
readerFactory = null;
writerFactory = null;
clientId = null;
}
}
protected EventProcessorClient createClient(final ProcessContext context) {
namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
final String eventHubNamespace = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final String consumerGroup = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
final String storageConnectionString = createStorageConnectionString(context);
final BlobContainerClientBuilder blobContainerClientBuilder = new BlobContainerClientBuilder()
.connectionString(storageConnectionString)
.containerName(containerName);
final ProxyOptions storageProxyOptions = AzureStorageUtils.getProxyOptions(context);
if (storageProxyOptions != null) {
blobContainerClientBuilder.clientOptions(new HttpClientOptions().setProxyOptions(storageProxyOptions));
final String fullyQualifiedNamespace = String.format("%s%s", eventHubNamespace, serviceBusEndpoint);
final CheckpointStore checkpointStore;
final Map<String, EventPosition> legacyPartitionEventPosition;
final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(context.getProperty(CHECKPOINT_STRATEGY).getValue());
if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) {
final String containerName = defaultIfBlank(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(), eventHubName);
final String storageConnectionString = createStorageConnectionString(context);
final BlobContainerClientBuilder blobContainerClientBuilder = new BlobContainerClientBuilder()
.connectionString(storageConnectionString)
.containerName(containerName);
final ProxyOptions storageProxyOptions = AzureStorageUtils.getProxyOptions(context);
if (storageProxyOptions != null) {
blobContainerClientBuilder.clientOptions(new HttpClientOptions().setProxyOptions(storageProxyOptions));
}
final BlobContainerAsyncClient blobContainerAsyncClient = blobContainerClientBuilder.buildAsyncClient();
checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
} else {
ComponentStateCheckpointStore componentStateCheckpointStore = new ComponentStateCheckpointStore(clientId, context.getStateManager());
componentStateCheckpointStore.cleanUp(fullyQualifiedNamespace, eventHubName, consumerGroup);
checkpointStore = componentStateCheckpointStore;
legacyPartitionEventPosition = Collections.emptyMap();
}
final BlobContainerAsyncClient blobContainerAsyncClient = blobContainerClientBuilder.buildAsyncClient();
final BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
final Long receiveTimeout = context.getProperty(RECEIVE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final Duration maxWaitTime = Duration.ofMillis(receiveTimeout);
@ -426,12 +495,12 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
final EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
.transportType(transportType)
.consumerGroup(consumerGroup)
.clientOptions(new AmqpClientOptions().setIdentifier(clientId))
.trackLastEnqueuedEventProperties(true)
.checkpointStore(checkpointStore)
.processError(errorProcessor)
.processEventBatch(eventBatchProcessor, maxBatchSize, maxWaitTime);
final String fullyQualifiedNamespace = String.format("%s%s", namespaceName, serviceBusEndpoint);
final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
if (useManagedIdentity) {
final ManagedIdentityCredentialBuilder managedIdentityCredentialBuilder = new ManagedIdentityCredentialBuilder();
@ -439,7 +508,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, managedIdentityCredential);
} else {
final String policyName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
}
@ -449,7 +518,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
eventProcessorClientBuilder.prefetchCount(prefetchCount);
}
final Map<String, EventPosition> legacyPartitionEventPosition = getLegacyPartitionEventPosition(blobContainerAsyncClient, consumerGroup);
if (legacyPartitionEventPosition.isEmpty()) {
final String initialOffset = context.getProperty(INITIAL_OFFSET).getValue();
// EventPosition.latest() is the default behavior is absence of existing checkpoints
@ -468,9 +536,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
}
protected String getTransitUri(final PartitionContext partitionContext) {
return String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s",
namespaceName,
serviceBusEndpoint,
return String.format("amqps://%s/%s/ConsumerGroups/%s/Partitions/%s",
partitionContext.getFullyQualifiedNamespace(),
partitionContext.getEventHubName(),
partitionContext.getConsumerGroup(),
partitionContext.getPartitionId()
@ -520,7 +587,14 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
}
}
getLogger().error("Receive Events failed Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]",
final String errorMessage;
if (throwable instanceof ComponentStateCheckpointStoreException) {
errorMessage = "Failed to access Component State Checkpoint Store";
} else {
errorMessage = "Receive Events failed";
}
getLogger().error(errorMessage + ". Namespace [{}] Event Hub [{}] Consumer Group [{}] Partition [{}]",
partitionContext.getFullyQualifiedNamespace(),
partitionContext.getEventHubName(),
partitionContext.getConsumerGroup(),
@ -666,8 +740,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem
private String createStorageConnectionString(final ProcessContext context) {
final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
final String domainName = serviceBusEndpoint.replace(".servicebus.", "");
final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageSasToken = context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
public enum CheckpointStoreKey {
CLIENT_ID("_clientId"),
CLUSTERED("_clustered");
private final String key;
CheckpointStoreKey(String key) {
this.key = key;
}
public String key() {
return key;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
public enum CheckpointStoreKeyPrefix {
OWNERSHIP("ownership"),
CHECKPOINT("checkpoint");
private final String keyPrefix;
CheckpointStoreKeyPrefix(String keyPrefix) {
this.keyPrefix = keyPrefix;
}
public String keyPrefix() {
return keyPrefix;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import org.apache.nifi.components.DescribedValue;
public enum CheckpointStrategy implements DescribedValue {
AZURE_BLOB_STORAGE("Azure Blob Storage", "Use Azure Blob Storage to store partition ownership and checkpoint information"),
COMPONENT_STATE("Component State", "Use component state to store partition ownership and checkpoint information");
private final String label;
private final String description;
CheckpointStrategy(String label, String description) {
this.label = label;
this.description = description;
}
@Override
public String getValue() {
return this.name();
}
@Override
public String getDisplayName() {
return label;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -0,0 +1,333 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import com.azure.core.util.CoreUtils;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ClusterNodeDisconnectedException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ConcurrentStateModificationException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.StateNotAvailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLUSTERED;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix.CHECKPOINT;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix.OWNERSHIP;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.checkpointToString;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertOwnership;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertPartitionContext;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointKey;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointValue;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipKey;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipValue;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.ownershipListToString;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.ownershipToString;
/**
* The {@link com.azure.messaging.eventhubs.CheckpointStore} is responsible for managing the storage of partition ownership and checkpoint information for Azure Event Hubs consumers.
* The underlying storage has to be persistent and centralized (shared across the consumer clients in the consumer group).
* <p>
* There exist one ownership entry and one checkpoint entry for each partition in the store. They represent {@link com.azure.messaging.eventhubs.models.PartitionOwnership}
* and {@link com.azure.messaging.eventhubs.models.Checkpoint} entities in a storage-specific serialized form.
* <p>
* The checkpoint store is plugged into {@link com.azure.messaging.eventhubs.EventProcessorClient} and directly used by the load balancer algorithm running in each consumer client instance.
* <p>
* {@code ComponentStateCheckpointStore} stores the partition ownership and checkpoint information in the component's (that is {@link org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub}
* processor's) state using NiFi's {@link org.apache.nifi.components.state.StateManager} in the background.
* <p>
* The format of the ownership entry in the state map:
* <pre> ownership/event-hub-namespace/event-hub-name/consumer-group/partition-id -> client-id/last-modified-time/etag</pre>
* <p>
* The format of the checkpoint entry in the state map:
* <pre> checkpoint/event-hub-namespace/event-hub-name/consumer-group/partition-id -> offset/sequence-number</pre>
* <p>
* The checkpoint store is required to provide optimistic locking mechanism in order to avoid concurrent updating of the same ownership entry and therefore owning the same partition
* by multiple client instances at the same time. The optimistic locking is supposed to be based on the <code>eTag</code> field of {@link com.azure.messaging.eventhubs.models.PartitionOwnership}
* and should be supported at entry level (only updating the same partition ownership is conflicting, claiming ownership of 2 different partitions or updating 2 checkpoints in parallel are
* valid operations as they are independent changes).
* <p>
* {@link org.apache.nifi.components.state.StateManager#replace(StateMap, Map, Scope)} method supports optimistic locking but only globally, in the scope of the whole state map (which may or may not
* contain conflicting changes after update). For this reason, the state update had to be implemented in 2 phases in {@link ComponentStateCheckpointStore#claimOwnership(List)}:
* <ul>
* <li>in the 1st phase the algorithm gets the current state and tries to set the ownership in memory based on <code>eTag</code>, the claim request is skipped if <code>eTag</code>
* does not match (the original <code>eTag</code> was retrieved in {@link ComponentStateCheckpointStore#listOwnership(String, String, String)})</li>
* <li>in the 2nd phase {@link org.apache.nifi.components.state.StateManager#replace(StateMap, Map, Scope)} is called to persist the new state and if it is not successful - meaning
* that another client instance changed the state in the meantime which may or may not be conflicting -, then the whole process needs to be started over with the 1st phase</li>
* </ul>
*/
public class ComponentStateCheckpointStore implements CheckpointStore {
private static final Logger LOGGER = LoggerFactory.getLogger(ComponentStateCheckpointStore.class);
private final String clientId;
private final StateManager stateManager;
public ComponentStateCheckpointStore(String clientId, StateManager stateManager) {
this.clientId = clientId;
this.stateManager = stateManager;
}
/**
* Cleans up the underlying state map and retains only items matching the "EventHub coordinates" passed in ({@code fullyQualifiedNamespace}, {@code eventHubName} and {@code consumerGroup}).
* The method should be called once in the initialization phase in order to remove the obsolete items but the checkpoint store can operate properly without doing that too.
*
* @param fullyQualifiedNamespace the fullyQualifiedNamespace of the items to be retained
* @param eventHubName the eventHubName of the items to be retained
* @param consumerGroup the consumerGroup of the items to be retained
*/
public void cleanUp(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
cleanUpMono(fullyQualifiedNamespace, eventHubName, consumerGroup)
.subscribe();
}
Mono<Void> cleanUpMono(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
return getState()
.doFirst(() -> debug("cleanUp() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup))
.flatMap(oldState -> {
Map<String, String> newMap = oldState.toMap().entrySet().stream()
.filter(e -> {
String key = e.getKey();
if (!key.startsWith(OWNERSHIP.keyPrefix()) && !key.startsWith(CHECKPOINT.keyPrefix())) {
return true;
}
PartitionContext context = convertPartitionContext(key);
return context.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace)
&& context.getEventHubName().equalsIgnoreCase(eventHubName)
&& context.getConsumerGroup().equalsIgnoreCase(consumerGroup);
})
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
int removed = oldState.toMap().size() - newMap.size();
if (removed > 0) {
debug("cleanUp() -> Removed {} item(s)", removed);
return updateState(oldState, newMap);
} else {
debug("cleanUp() -> Nothing to clean up");
return Mono.empty();
}
})
.doOnSuccess(__ -> debug("cleanUp() -> Succeeded"))
.retryWhen(createRetrySpec("cleanUp"))
.doOnError(throwable -> debug("cleanUp() -> Failed: {}", throwable.getMessage()));
}
@Override
public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
return getState()
.doFirst(() -> debug("listOwnership() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup))
.flatMapMany(state -> {
checkDisconnectedNode(state);
return getOwnerships(state);
})
.filter(ownership ->
ownership.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace)
&& ownership.getEventHubName().equalsIgnoreCase(eventHubName)
&& ownership.getConsumerGroup().equalsIgnoreCase(consumerGroup)
)
.doOnNext(partitionOwnership -> debug("listOwnership() -> Returning {}", ownershipToString(partitionOwnership)))
.doOnComplete(() -> debug("listOwnership() -> Succeeded"))
.doOnError(throwable -> debug("listOwnership() -> Failed: {}", throwable.getMessage()));
}
@Override
public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
return getState()
.doFirst(() -> debug("claimOwnership() -> Entering [{}]", ownershipListToString(requestedPartitionOwnerships)))
.flatMapMany(oldState -> {
checkDisconnectedNode(oldState);
Map<String, String> newMap = new HashMap<>(oldState.toMap());
List<PartitionOwnership> claimedOwnerships = new ArrayList<>();
long timestamp = System.currentTimeMillis();
for (PartitionOwnership requestedPartitionOwnership : requestedPartitionOwnerships) {
String key = createOwnershipKey(requestedPartitionOwnership);
if (oldState.get(key) != null) {
PartitionOwnership oldPartitionOwnership = convertOwnership(key, oldState.get(key));
String oldETag = oldPartitionOwnership.getETag();
String reqETag = requestedPartitionOwnership.getETag();
if (StringUtils.isNotEmpty(oldETag) && !oldETag.equals(reqETag)) {
debug("claimOwnership() -> Already claimed {}", ownershipToString(oldPartitionOwnership));
continue;
}
}
String newETag = CoreUtils.randomUuid().toString();
PartitionOwnership partitionOwnership = new PartitionOwnership()
.setFullyQualifiedNamespace(requestedPartitionOwnership.getFullyQualifiedNamespace())
.setEventHubName(requestedPartitionOwnership.getEventHubName())
.setConsumerGroup(requestedPartitionOwnership.getConsumerGroup())
.setPartitionId(requestedPartitionOwnership.getPartitionId())
.setOwnerId(requestedPartitionOwnership.getOwnerId())
.setLastModifiedTime(timestamp)
.setETag(newETag);
claimedOwnerships.add(partitionOwnership);
newMap.put(key, createOwnershipValue(partitionOwnership));
debug("claimOwnership() -> Claiming {}", ownershipToString(partitionOwnership));
}
if (claimedOwnerships.isEmpty()) {
return Flux.empty();
}
return updateState(oldState, newMap)
.thenMany(Flux.fromIterable(claimedOwnerships));
})
.doOnNext(partitionOwnership -> debug("claimOwnership() -> Returning {}", ownershipToString(partitionOwnership)))
.doOnComplete(() -> debug("claimOwnership() -> Succeeded"))
.retryWhen(createRetrySpec("claimOwnership"))
.doOnError(throwable -> debug("claimOwnership() -> Failed: {}", throwable.getMessage()));
}
@Override
public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
return getState()
.doFirst(() -> debug("listCheckpoints() -> Entering [{}, {}, {}]", fullyQualifiedNamespace, eventHubName, consumerGroup))
.flatMapMany(state -> {
checkDisconnectedNode(state);
return getCheckpoints(state);
})
.filter(checkpoint ->
checkpoint.getFullyQualifiedNamespace().equalsIgnoreCase(fullyQualifiedNamespace)
&& checkpoint.getEventHubName().equalsIgnoreCase(eventHubName)
&& checkpoint.getConsumerGroup().equalsIgnoreCase(consumerGroup)
)
.doOnNext(checkpoint -> debug("listCheckpoints() -> Returning {}", checkpointToString(checkpoint)))
.doOnComplete(() -> debug("listCheckpoints() -> Succeeded"))
.doOnError(throwable -> debug("listCheckpoints() -> Failed: {}", throwable.getMessage()));
}
@Override
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
return getState()
.doFirst(() -> debug("updateCheckpoint() -> Entering [{}]", checkpointToString(checkpoint)))
.flatMap(oldState -> {
checkDisconnectedNode(oldState);
Map<String, String> newMap = new HashMap<>(oldState.toMap());
newMap.put(createCheckpointKey(checkpoint), createCheckpointValue(checkpoint));
return updateState(oldState, newMap);
})
.doOnSuccess(__ -> debug("updateCheckpoint() -> Succeeded"))
.retryWhen(createRetrySpec("updateCheckpoint"))
.doOnError(throwable -> debug("updateCheckpoint() -> Failed: {}", throwable.getMessage()));
}
private Retry createRetrySpec(String methodName) {
return Retry.max(10)
.filter(t -> t instanceof ConcurrentStateModificationException)
.doBeforeRetry(retrySignal -> debug(methodName + "() -> Retry: {}", retrySignal))
.onRetryExhaustedThrow((retrySpec, retrySignal) -> new ConcurrentStateModificationException(
String.format("Retrials of concurrent state modifications has been exhausted (%d retrials)", 10)));
}
private Flux<PartitionOwnership> getOwnerships(StateMap state) {
return getEntries(state, OWNERSHIP.keyPrefix(), ComponentStateCheckpointStoreUtils::convertOwnership);
}
private Flux<Checkpoint> getCheckpoints(StateMap state) {
return getEntries(state, CHECKPOINT.keyPrefix(), ComponentStateCheckpointStoreUtils::convertCheckpoint);
}
private <T> Flux<T> getEntries(StateMap state, String kind, BiFunction<String, String, T> converter) throws ProcessException {
return state.toMap().entrySet().stream()
.filter(e -> e.getKey().startsWith(kind))
.map(e -> converter.apply(e.getKey(), e.getValue()))
.collect(collectingAndThen(toList(), Flux::fromIterable));
}
private void checkDisconnectedNode(StateMap state) {
// if _isClustered key is available in the state (that is the local cache is accessed via cluster scope) and it is true, then it is a disconnected cluster node
boolean disconnectedNode = Boolean.parseBoolean(state.get(CLUSTERED.key()));
if (disconnectedNode) {
throw new ClusterNodeDisconnectedException("The node has been disconnected from the cluster, the checkpoint store is not accessible");
}
}
private void debug(String message, Object... arguments) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[clientId={}] " + message, ArrayUtils.addFirst(arguments, clientId));
}
}
private Mono<StateMap> getState() {
return Mono.defer(
() -> {
try {
StateMap state = stateManager.getState(Scope.CLUSTER);
return Mono.just(state);
} catch (Exception e) {
return Mono.error(new StateNotAvailableException(e));
}
}
);
}
private Mono<Void> updateState(StateMap oldState, Map<String, String> newMap) {
return Mono.defer(
() -> {
try {
boolean success = stateManager.replace(oldState, newMap, Scope.CLUSTER);
if (success) {
return Mono.empty();
} else {
return Mono.error(new ConcurrentStateModificationException(
String.format("Component state with version [%s] has been modified by another instance" , oldState.getStateVersion().orElse("new"))));
}
} catch (Exception e) {
return Mono.error(new StateNotAvailableException(e));
}
}
);
}
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.exception.ProcessException;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix.CHECKPOINT;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKeyPrefix.OWNERSHIP;
final class ComponentStateCheckpointStoreUtils {
private ComponentStateCheckpointStoreUtils() {
}
static PartitionOwnership convertOwnership(String key, String value) {
PartitionContext context = convertPartitionContext(key);
final String[] parts = value.split("/", 3);
if (parts.length != 3) {
throw new ProcessException(String.format("Invalid ownership value: %s", value));
}
return new PartitionOwnership()
.setFullyQualifiedNamespace(context.getFullyQualifiedNamespace())
.setEventHubName(context.getEventHubName())
.setConsumerGroup(context.getConsumerGroup())
.setPartitionId(context.getPartitionId())
.setOwnerId(parts[0])
.setLastModifiedTime(Long.parseLong(parts[1]))
.setETag(parts[2]);
}
static Checkpoint convertCheckpoint(String key, String value) {
PartitionContext context = convertPartitionContext(key);
final String[] parts = value.split("/", 2);
if (parts.length != 2) {
throw new ProcessException(String.format("Invalid checkpoint value: %s", value));
}
return new Checkpoint()
.setFullyQualifiedNamespace(context.getFullyQualifiedNamespace())
.setEventHubName(context.getEventHubName())
.setConsumerGroup(context.getConsumerGroup())
.setPartitionId(context.getPartitionId())
.setOffset(StringUtils.isNotEmpty(parts[0]) ? Long.parseLong(parts[0]) : null)
.setSequenceNumber(StringUtils.isNotEmpty(parts[1]) ? Long.parseLong(parts[1]): null);
}
static PartitionContext convertPartitionContext(String key) {
final String[] parts = key.split("/", 5);
if (parts.length != 5) {
throw new ProcessException(String.format("Invalid entry key: %s", key));
}
final String fullyQualifiedNamespace = parts[1];
final String eventHubName = parts[2];
final String consumerGroup = parts[3];
final String partitionId = parts[4];
return new PartitionContext(
fullyQualifiedNamespace,
eventHubName,
consumerGroup,
partitionId
);
}
static String createOwnershipKey(PartitionOwnership partitionOwnership) {
return createKey(
OWNERSHIP.keyPrefix(),
partitionOwnership.getFullyQualifiedNamespace(),
partitionOwnership.getEventHubName(),
partitionOwnership.getConsumerGroup(),
partitionOwnership.getPartitionId()
);
}
static String createCheckpointKey(Checkpoint checkpoint) {
return createKey(
CHECKPOINT.keyPrefix(),
checkpoint.getFullyQualifiedNamespace(),
checkpoint.getEventHubName(),
checkpoint.getConsumerGroup(),
checkpoint.getPartitionId()
);
}
private static String createKey(String kind, String fullyQualifiedNamespace, String eventHubName, String consumerGroup, String partitionId) {
return String.format(
"%s/%s/%s/%s/%s",
kind,
fullyQualifiedNamespace,
eventHubName,
consumerGroup,
partitionId
);
}
static String createOwnershipValue(PartitionOwnership partitionOwnership) {
return String.format("%s/%s/%s",
partitionOwnership.getOwnerId(),
partitionOwnership.getLastModifiedTime(),
partitionOwnership.getETag());
}
static String createCheckpointValue(Checkpoint checkpoint) {
return String.format("%s/%s",
checkpoint.getOffset() != null ? checkpoint.getOffset().toString() : "",
checkpoint.getSequenceNumber() != null ? checkpoint.getSequenceNumber().toString() : "");
}
static String ownershipToString(PartitionOwnership partitionOwnership) {
return "PartitionOwnership{" +
"fullyQualifiedNamespace='" + partitionOwnership.getFullyQualifiedNamespace() + '\'' +
", eventHubName='" + partitionOwnership.getEventHubName() + '\'' +
", consumerGroup='" + partitionOwnership.getConsumerGroup() + '\'' +
", partitionId='" + partitionOwnership.getPartitionId() + '\'' +
", ownerId='" + partitionOwnership.getOwnerId() + '\'' +
", lastModifiedTime=" + partitionOwnership.getLastModifiedTime() +
", eTag='" + partitionOwnership.getETag() + '\'' +
'}';
}
static List<String> ownershipListToString(List<PartitionOwnership> partitionOwnershipList) {
return partitionOwnershipList.stream()
.map(ComponentStateCheckpointStoreUtils::ownershipToString)
.collect(Collectors.toList());
}
static String checkpointToString(Checkpoint checkpoint) {
return "Checkpoint{" +
"fullyQualifiedNamespace='" + checkpoint.getFullyQualifiedNamespace() + '\'' +
", eventHubName='" + checkpoint.getEventHubName() + '\'' +
", consumerGroup='" + checkpoint.getConsumerGroup() + '\'' +
", partitionId='" + checkpoint.getPartitionId() + '\'' +
", offset=" + checkpoint.getOffset() +
", sequenceNumber=" + checkpoint.getSequenceNumber() +
'}';
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint.exception;
public class ClusterNodeDisconnectedException extends ComponentStateCheckpointStoreException {
public ClusterNodeDisconnectedException(String message) {
super(message);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint.exception;
public class ComponentStateCheckpointStoreException extends RuntimeException {
public ComponentStateCheckpointStoreException(String message) {
super(message);
}
public ComponentStateCheckpointStoreException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint.exception;
public class ConcurrentStateModificationException extends ComponentStateCheckpointStoreException {
public ConcurrentStateModificationException(String message) {
super(message);
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint.exception;
public class StateNotAvailableException extends ComponentStateCheckpointStoreException {
public StateNotAvailableException(Throwable cause) {
super("Failure when reading/writing the component state", cause);
}
}

View File

@ -24,6 +24,7 @@ import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStrategy;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.proxy.ProxyConfiguration;
@ -223,6 +224,19 @@ public class TestConsumeAzureEventHub {
testRunner.assertValid();
}
@Test
public void testProcessorConfigValidityWithComponentStateCheckpointStrategy() throws InitializationException {
testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, POLICY_NAME);
testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, POLICY_KEY);
testRunner.assertNotValid();
testRunner.setProperty(ConsumeAzureEventHub.CHECKPOINT_STRATEGY, CheckpointStrategy.COMPONENT_STATE.getValue());
testRunner.assertValid();
}
@Test
public void testReceiveOne() {
setProperties();

View File

@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import org.junit.jupiter.api.BeforeEach;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
abstract class AbstractCheckpointStoreTest {
static final String EVENT_HUB_NAMESPACE = "my-event-hub-namespace";
static final String EVENT_HUB_NAME = "my-event-hub-name";
static final String CONSUMER_GROUP = "my-consumer-group";
static final String PARTITION_ID_1 = "1";
static final String PARTITION_ID_2 = "2";
static final String CLIENT_ID_1 = "client-id-1";
static final String CLIENT_ID_2 = "client-id-2";
static final Long LAST_MODIFIED_TIME = 1234567890L;
static final String ETAG = "my-etag";
static final Long OFFSET = 10L;
static final Long SEQUENCE_NUMBER = 1L;
PartitionOwnership partitionOwnership1;
PartitionOwnership partitionOwnership2;
Checkpoint checkpoint1;
Checkpoint checkpoint2;
@BeforeEach
void initTestData() {
partitionOwnership1 = createPartitionOwnership(PARTITION_ID_1, CLIENT_ID_1);
partitionOwnership2 = createPartitionOwnership(PARTITION_ID_2, CLIENT_ID_2);
checkpoint1 = createCheckpoint(PARTITION_ID_1, OFFSET, SEQUENCE_NUMBER);
checkpoint2 = createCheckpoint(PARTITION_ID_2, OFFSET, SEQUENCE_NUMBER);
}
PartitionOwnership createPartitionOwnership(String partitionId, String ownerId) {
return createPartitionOwnership(
EVENT_HUB_NAMESPACE,
EVENT_HUB_NAME,
CONSUMER_GROUP,
partitionId,
ownerId
);
}
PartitionOwnership createPartitionOwnership(
String fullyQualifiedNamespace,
String eventHubName,
String consumerGroup,
String partitionId,
String ownerId) {
return new TestablePartitionOwnership()
.setFullyQualifiedNamespace(fullyQualifiedNamespace)
.setEventHubName(eventHubName)
.setConsumerGroup(consumerGroup)
.setPartitionId(partitionId)
.setOwnerId(ownerId)
.setLastModifiedTime(null)
.setETag(null);
}
Checkpoint createCheckpoint(String partitionId, Long offset, Long sequenceNumber) {
return createCheckpoint(
EVENT_HUB_NAMESPACE,
EVENT_HUB_NAME,
CONSUMER_GROUP,
partitionId,
offset,
sequenceNumber
);
}
Checkpoint createCheckpoint(
String fullyQualifiedNamespace,
String eventHubName,
String consumerGroup,
String partitionId,
Long offset,
Long sequenceNumber) {
return new TestableCheckpoint()
.setFullyQualifiedNamespace(fullyQualifiedNamespace)
.setEventHubName(eventHubName)
.setConsumerGroup(consumerGroup)
.setPartitionId(partitionId)
.setOffset(offset)
.setSequenceNumber(sequenceNumber);
}
PartitionOwnership copy(PartitionOwnership original) {
return convertToTestable(original);
}
Checkpoint copy(Checkpoint original) {
return convertToTestable(original);
}
PartitionOwnership convertToTestable(PartitionOwnership original) {
return new TestablePartitionOwnership()
.setFullyQualifiedNamespace(original.getFullyQualifiedNamespace())
.setEventHubName(original.getEventHubName())
.setConsumerGroup(original.getConsumerGroup())
.setPartitionId(original.getPartitionId())
.setOwnerId(original.getOwnerId())
.setLastModifiedTime(original.getLastModifiedTime())
.setETag(original.getETag());
}
Checkpoint convertToTestable(Checkpoint original) {
return new TestableCheckpoint()
.setFullyQualifiedNamespace(original.getFullyQualifiedNamespace())
.setEventHubName(original.getEventHubName())
.setConsumerGroup(original.getConsumerGroup())
.setPartitionId(original.getPartitionId())
.setOffset(original.getOffset())
.setSequenceNumber(original.getSequenceNumber());
}
List<PartitionOwnership> convertToTestablePartitionOwnerships(List<PartitionOwnership> partitionOwnerships) {
return partitionOwnerships.stream()
.map(this::convertToTestable)
.collect(Collectors.toList());
}
List<Checkpoint> convertToTestableCheckpoints(List<Checkpoint> checkpoints) {
return checkpoints.stream()
.map(this::convertToTestable)
.collect(Collectors.toList());
}
static class TestablePartitionOwnership extends PartitionOwnership {
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestablePartitionOwnership that = (TestablePartitionOwnership) o;
return Objects.equals(getFullyQualifiedNamespace(), that.getFullyQualifiedNamespace())
&& Objects.equals(getEventHubName(), that.getEventHubName())
&& Objects.equals(getConsumerGroup(), that.getConsumerGroup())
&& Objects.equals(getPartitionId(), that.getPartitionId())
&& Objects.equals(getOwnerId(), that.getOwnerId())
&& Objects.equals(getLastModifiedTime(), that.getLastModifiedTime())
&& Objects.equals(getETag(), that.getETag());
}
@Override
public int hashCode() {
return Objects.hash(
getFullyQualifiedNamespace(),
getEventHubName(),
getConsumerGroup(),
getPartitionId(),
getOwnerId(),
getLastModifiedTime(),
getETag()
);
}
}
static class TestableCheckpoint extends Checkpoint {
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestableCheckpoint that = (TestableCheckpoint) o;
return Objects.equals(getFullyQualifiedNamespace(), that.getFullyQualifiedNamespace())
&& Objects.equals(getEventHubName(), that.getEventHubName())
&& Objects.equals(getConsumerGroup(), that.getConsumerGroup())
&& Objects.equals(getPartitionId(), that.getPartitionId())
&& Objects.equals(getOffset(), that.getOffset())
&& Objects.equals(getSequenceNumber(), that.getSequenceNumber());
}
@Override
public int hashCode() {
return Objects.hash(
getFullyQualifiedNamespace(),
getEventHubName(),
getConsumerGroup(),
getPartitionId(),
getOffset(),
getSequenceNumber()
);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import com.azure.core.util.CoreUtils;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import org.apache.nifi.components.state.StateManager;
import org.junit.jupiter.api.BeforeEach;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
abstract class AbstractComponentStateCheckpointStoreTest extends AbstractCheckpointStoreTest {
ComponentStateCheckpointStore checkpointStore;
@BeforeEach
void initCheckpointStore() {
checkpointStore = new ComponentStateCheckpointStore(CLIENT_ID_1, getStateManager());
}
abstract StateManager getStateManager();
PartitionOwnership setETagAndLastModified(PartitionOwnership partitionOwnership) {
return partitionOwnership.setETag(CoreUtils.randomUuid().toString())
.setLastModifiedTime(System.currentTimeMillis() - 1000);
}
void assertClaimedOwnership(PartitionOwnership requestedOwnership, PartitionOwnership claimedOwnership) {
assertEquals(requestedOwnership.getFullyQualifiedNamespace(), claimedOwnership.getFullyQualifiedNamespace());
assertEquals(requestedOwnership.getEventHubName(), claimedOwnership.getEventHubName());
assertEquals(requestedOwnership.getConsumerGroup(), claimedOwnership.getConsumerGroup());
assertEquals(requestedOwnership.getPartitionId(), claimedOwnership.getPartitionId());
assertEquals(requestedOwnership.getOwnerId(), claimedOwnership.getOwnerId());
assertNotNull(claimedOwnership.getLastModifiedTime());
assertThat(claimedOwnership.getLastModifiedTime(), greaterThan(requestedOwnership.getLastModifiedTime() != null ? requestedOwnership.getLastModifiedTime() : 0));
assertNotNull(claimedOwnership.getETag());
assertNotEquals(requestedOwnership.getETag(), claimedOwnership.getETag());
}
Map<String, String> initMap(PartitionOwnership... partitionOwnerships) {
return Stream.of(partitionOwnerships)
.map(this::copy)
.map(this::setETagAndLastModified)
.collect(Collectors.toMap(ComponentStateCheckpointStoreUtils::createOwnershipKey, ComponentStateCheckpointStoreUtils::createOwnershipValue));
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.state.MockStateMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointKey;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointValue;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipKey;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ComponentStateCheckpointStoreConcurrencyTest extends AbstractComponentStateCheckpointStoreTest {
@Mock(strictness = Mock.Strictness.WARN)
private StateManager stateManager;
@Captor
private ArgumentCaptor<Map<String, String>> updatedMapCaptor;
@Override
StateManager getStateManager() {
return stateManager;
}
@Test
void testConcurrentClaimDifferentOwnerships() throws IOException {
StateMap state1 = new MockStateMap(initMap(), 1);
StateMap state2 = new MockStateMap(initMap(partitionOwnership1), 2);
when(stateManager.getState(Scope.CLUSTER))
.thenReturn(state1)
.thenReturn(state2);
when(stateManager.replace(eq(state1), anyMap(), eq(Scope.CLUSTER))).thenReturn(false);
when(stateManager.replace(eq(state2), anyMap(), eq(Scope.CLUSTER))).thenReturn(true);
List<PartitionOwnership> requestedOwnerships = Collections.singletonList(partitionOwnership2);
List<PartitionOwnership> claimedOwnerships = new ArrayList<>();
checkpointStore.claimOwnership(requestedOwnerships).subscribe(claimedOwnerships::add);
assertThat(claimedOwnerships, hasSize(1));
PartitionOwnership claimedOwnership = claimedOwnerships.get(0);
assertClaimedOwnership(partitionOwnership2, claimedOwnership);
verify(stateManager, times(2)).getState(eq(Scope.CLUSTER));
verify(stateManager, times(2)).replace(any(StateMap.class), updatedMapCaptor.capture(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
Map<String, String> updatedMap1 = updatedMapCaptor.getAllValues().get(0);
assertThat(updatedMap1.size(), is(equalTo(1)));
assertThat(updatedMap1, hasEntry(equalTo(createOwnershipKey(partitionOwnership2)), startsWith(partitionOwnership2.getOwnerId())));
Map<String, String> updatedMap2 = updatedMapCaptor.getAllValues().get(1);
assertThat(updatedMap2.size(), is(equalTo(2)));
assertThat(updatedMap2, hasEntry(equalTo(createOwnershipKey(partitionOwnership1)), startsWith(partitionOwnership1.getOwnerId())));
assertThat(updatedMap2, hasEntry(equalTo(createOwnershipKey(partitionOwnership2)), startsWith(partitionOwnership2.getOwnerId())));
}
@Test
void testConcurrentClaimSameOwnership() throws IOException {
StateMap state1 = new MockStateMap(initMap(), 1);
StateMap state2 = new MockStateMap(initMap(partitionOwnership1), 2);
when(stateManager.getState(Scope.CLUSTER))
.thenReturn(state1)
.thenReturn(state2);
when(stateManager.replace(eq(state1), anyMap(), eq(Scope.CLUSTER))).thenReturn(false);
List<PartitionOwnership> requestedOwnerships = Collections.singletonList(partitionOwnership1);
List<PartitionOwnership> claimedOwnerships = new ArrayList<>();
checkpointStore.claimOwnership(requestedOwnerships).subscribe(claimedOwnerships::add);
assertThat(claimedOwnerships, hasSize(0));
verify(stateManager, times(2)).getState(eq(Scope.CLUSTER));
verify(stateManager, times(1)).replace(any(StateMap.class), updatedMapCaptor.capture(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
Map<String, String> updatedMap1 = updatedMapCaptor.getAllValues().get(0);
assertThat(updatedMap1.size(), is(equalTo(1)));
assertThat(updatedMap1, hasEntry(equalTo(createOwnershipKey(partitionOwnership1)), startsWith(partitionOwnership1.getOwnerId())));
}
@Test
void testConcurrentUpdateCheckpoint() throws IOException {
StateMap state1 = new MockStateMap(initMap(partitionOwnership1), 1);
StateMap state2 = new MockStateMap(initMap(partitionOwnership1, partitionOwnership2), 2);
when(stateManager.getState(Scope.CLUSTER))
.thenReturn(state1)
.thenReturn(state2);
when(stateManager.replace(eq(state1), anyMap(), eq(Scope.CLUSTER))).thenReturn(false);
when(stateManager.replace(eq(state2), anyMap(), eq(Scope.CLUSTER))).thenReturn(true);
checkpointStore.updateCheckpoint(checkpoint1).subscribe();
verify(stateManager, times(2)).getState(eq(Scope.CLUSTER));
verify(stateManager, times(2)).replace(any(StateMap.class), updatedMapCaptor.capture(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
Map<String, String> updatedMap1 = updatedMapCaptor.getAllValues().get(0);
assertThat(updatedMap1.size(), is(equalTo(2)));
assertThat(updatedMap1, hasEntry(equalTo(createOwnershipKey(partitionOwnership1)), startsWith(partitionOwnership1.getOwnerId())));
assertThat(updatedMap1, hasEntry(createCheckpointKey(checkpoint1), createCheckpointValue(checkpoint1)));
Map<String, String> updatedMap2 = updatedMapCaptor.getAllValues().get(1);
assertThat(updatedMap2.size(), is(equalTo(3)));
assertThat(updatedMap2, hasEntry(equalTo(createOwnershipKey(partitionOwnership1)), startsWith(partitionOwnership1.getOwnerId())));
assertThat(updatedMap2, hasEntry(equalTo(createOwnershipKey(partitionOwnership2)), startsWith(partitionOwnership2.getOwnerId())));
assertThat(updatedMap1, hasEntry(createCheckpointKey(checkpoint1), createCheckpointValue(checkpoint1)));
}
@Test
void testConcurrentCleanUp() throws IOException {
StateMap state1 = new MockStateMap(initMap(partitionOwnership1), 1);
StateMap state2 = new MockStateMap(initMap(), 2);
when(stateManager.getState(Scope.CLUSTER))
.thenReturn(state1)
.thenReturn(state2);
when(stateManager.replace(eq(state1), anyMap(), eq(Scope.CLUSTER))).thenReturn(false);
checkpointStore.cleanUp(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2");
verify(stateManager, times(2)).getState(eq(Scope.CLUSTER));
verify(stateManager, times(1)).replace(any(StateMap.class), updatedMapCaptor.capture(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
Map<String, String> updatedMap1 = updatedMapCaptor.getAllValues().get(0);
assertTrue(updatedMap1.isEmpty());
}
}

View File

@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ConcurrentStateModificationException;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.StateNotAvailableException;
import org.apache.nifi.state.MockStateMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ComponentStateCheckpointStoreFailureTest extends AbstractComponentStateCheckpointStoreTest {
@Mock(strictness = Mock.Strictness.WARN)
private StateManager stateManager;
@Override
StateManager getStateManager() {
return stateManager;
}
@Test
void testListOwnership_GetState_IOException() throws IOException {
when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class);
StepVerifier.create(checkpointStore.listOwnership(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP))
.expectError(StateNotAvailableException.class)
.verify();
verify(stateManager).getState(Scope.CLUSTER);
verifyNoMoreInteractions(stateManager);
}
@Test
void testListCheckpoints_GetState_IOException() throws IOException {
when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class);
StepVerifier.create(checkpointStore.listCheckpoints(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP))
.expectError(StateNotAvailableException.class)
.verify();
verify(stateManager).getState(Scope.CLUSTER);
verifyNoMoreInteractions(stateManager);
}
@Test
void testClaimOwnership_GetState_IOException() throws IOException {
when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class);
StepVerifier.create(checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1)))
.expectError(StateNotAvailableException.class)
.verify();
verify(stateManager, times(1)).getState(Scope.CLUSTER);
verifyNoMoreInteractions(stateManager);
}
@Test
void testClaimOwnership_ReplaceState_IOException() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenThrow(IOException.class);
StepVerifier.create(checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1)))
.expectError(StateNotAvailableException.class)
.verify();
verify(stateManager).getState(Scope.CLUSTER);
verify(stateManager).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
@Test
void testUpdateCheckpoint_GetState_IOException() throws IOException {
when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class);
StepVerifier.create(checkpointStore.updateCheckpoint(checkpoint1))
.expectError(StateNotAvailableException.class)
.verify();
verify(stateManager, times(1)).getState(Scope.CLUSTER);
verifyNoMoreInteractions(stateManager);
}
@Test
void testUpdateCheckpoint_ReplaceState_IOException() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenThrow(IOException.class);
StepVerifier.create(checkpointStore.updateCheckpoint(checkpoint1))
.expectError(StateNotAvailableException.class)
.verify();
verify(stateManager, times(1)).getState(Scope.CLUSTER);
verify(stateManager).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
@Test
void testCleanUp_GetState_IOException() throws IOException {
when(stateManager.getState(Scope.CLUSTER)).thenThrow(IOException.class);
StepVerifier.create(checkpointStore.cleanUpMono(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2"))
.expectError(StateNotAvailableException.class)
.verify();
verify(stateManager).getState(Scope.CLUSTER);
verifyNoMoreInteractions(stateManager);
}
@Test
void testCleanUp_ReplaceState_IOException() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(initMap(partitionOwnership1)), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenThrow(IOException.class);
StepVerifier.create(checkpointStore.cleanUpMono(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2"))
.expectError(StateNotAvailableException.class)
.verify();
verify(stateManager, times(1)).getState(Scope.CLUSTER);
verify(stateManager).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
@Test
void testClaimOwnership_ReplaceState_ConcurrentStateModificationException_Success() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false, false, true);
StepVerifier.withVirtualTime(() -> checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1)))
.thenAwait(Duration.ofSeconds(1))
.expectNextCount(1)
.verifyComplete();
verify(stateManager, times(3)).getState(Scope.CLUSTER);
verify(stateManager, times(3)).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
@Test
void testClaimOwnership_ReplaceState_ConcurrentStateModificationException_Failure() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false);
StepVerifier.withVirtualTime(() -> checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1)))
.thenAwait(Duration.ofSeconds(10))
.expectError(ConcurrentStateModificationException.class)
.verify();
verify(stateManager, times(11)).getState(Scope.CLUSTER);
verify(stateManager, times(11)).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
@Test
void testUpdateCheckpoint_ReplaceState_ConcurrentStateModificationException_Success() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false, false, true);
StepVerifier.withVirtualTime(() -> checkpointStore.updateCheckpoint(checkpoint1))
.thenAwait(Duration.ofSeconds(1))
.expectNext()
.verifyComplete();
verify(stateManager, times(3)).getState(Scope.CLUSTER);
verify(stateManager, times(3)).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
@Test
void testUpdateCheckpoint_ReplaceState_ConcurrentStateModificationException_Failure() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false);
StepVerifier.withVirtualTime(() -> checkpointStore.updateCheckpoint(checkpoint1))
.thenAwait(Duration.ofSeconds(10))
.expectError(ConcurrentStateModificationException.class)
.verify();
verify(stateManager, times(11)).getState(Scope.CLUSTER);
verify(stateManager, times(11)).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
@Test
void testCleanUp_ReplaceState_ConcurrentStateModificationException_Success() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(initMap(partitionOwnership1)), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false, false, true);
StepVerifier.withVirtualTime(() -> checkpointStore.cleanUpMono(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2"))
.thenAwait(Duration.ofSeconds(1))
.expectNext()
.verifyComplete();
verify(stateManager, times(3)).getState(Scope.CLUSTER);
verify(stateManager, times(3)).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
@Test
void testCleanUp_ReplaceState_ConcurrentStateModificationException_Failure() throws IOException {
StateMap state = new MockStateMap(new HashMap<>(initMap(partitionOwnership1)), 1);
when(stateManager.getState(Scope.CLUSTER)).thenReturn(state);
when(stateManager.replace(eq(state), anyMap(), eq(Scope.CLUSTER))).thenReturn(false);
StepVerifier.withVirtualTime(() -> checkpointStore.cleanUpMono(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2"))
.thenAwait(Duration.ofSeconds(10))
.expectError(ConcurrentStateModificationException.class)
.verify();
verify(stateManager, times(11)).getState(Scope.CLUSTER);
verify(stateManager, times(11)).replace(eq(state), anyMap(), eq(Scope.CLUSTER));
verifyNoMoreInteractions(stateManager);
}
}

View File

@ -0,0 +1,403 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.processors.azure.eventhub.checkpoint.exception.ClusterNodeDisconnectedException;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.state.MockStateManager.ExecutionMode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLIENT_ID;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.CheckpointStoreKey.CLUSTERED;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointKey;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointValue;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipKey;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class ComponentStateCheckpointStoreTest extends AbstractComponentStateCheckpointStoreTest {
private MockStateManager stateManager;
@Override
StateManager getStateManager() {
stateManager = new MockStateManager(new Object());
stateManager.setIgnoreAnnotations(true);
return stateManager;
}
@BeforeEach
void beforeEach() throws IOException {
stateManager.setExecutionMode(ExecutionMode.CLUSTERED);
addToState(CLIENT_ID.key(), CLIENT_ID_1, Scope.LOCAL);
addToState(CLUSTERED.key(), "true", Scope.LOCAL);
}
@Test
void testOwnershipStoredInState() throws IOException {
checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1)).blockLast();
String expectedOwnershipKey = "ownership/my-event-hub-namespace/my-event-hub-name/my-consumer-group/1";
stateManager.assertStateSet(expectedOwnershipKey, Scope.CLUSTER);
String ownershipValue = stateManager.getState(Scope.CLUSTER).get(expectedOwnershipKey);
assertTrue(ownershipValue.matches("client-id-1/\\d+/.+"));
}
@Test
void testCheckpointStoredInState() {
checkpointStore.updateCheckpoint(checkpoint1).block();
String expectedCheckpointKey = "checkpoint/my-event-hub-namespace/my-event-hub-name/my-consumer-group/1";
String expectedCheckpointValue = "10/1";
stateManager.assertStateEquals(expectedCheckpointKey, expectedCheckpointValue, Scope.CLUSTER);
}
@Test
void testListing() throws IOException {
initStateWithAllItems();
testListOwnerships(partitionOwnership1, partitionOwnership2);
testListCheckpoints(checkpoint1, checkpoint2);
}
@ParameterizedTest
@EnumSource(ExecutionMode.class)
void testCleanUp(ExecutionMode executionMode) throws IOException {
setExecutionMode(executionMode);
initStateWithAllItems();
int expectedBefore = executionMode == ExecutionMode.CLUSTERED ? 10 : 12;
assertEquals(expectedBefore, stateManager.getState(Scope.CLUSTER).toMap().size());
checkpointStore.cleanUp(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP);
int expectedAfter = executionMode == ExecutionMode.CLUSTERED ? 4 : 6;
assertEquals(expectedAfter, stateManager.getState(Scope.CLUSTER).toMap().size());
testListOwnerships(partitionOwnership1, partitionOwnership2);
testListCheckpoints(checkpoint1, checkpoint2);
assertLocalScope(executionMode);
}
@ParameterizedTest
@EnumSource(ExecutionMode.class)
void testClaimOwnership(ExecutionMode executionMode) throws IOException {
setExecutionMode(executionMode);
addToState(partitionOwnership1);
addToState(checkpoint1);
List<PartitionOwnership> requestedOwnerships = Collections.singletonList(partitionOwnership2);
List<PartitionOwnership> claimedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block();
assertNotNull(claimedOwnerships);
assertEquals(1, claimedOwnerships.size());
PartitionOwnership claimedOwnership = claimedOwnerships.get(0);
assertClaimedOwnership(partitionOwnership2, claimedOwnership);
assertStoredOwnerships(partitionOwnership1, convertToTestable(claimedOwnership));
assertStoredCheckpoints(checkpoint1);
assertLocalScope(executionMode);
}
@Test
void testRenewOwnership() throws IOException {
addToState(partitionOwnership1);
addToState(partitionOwnership2);
addToState(checkpoint1);
addToState(checkpoint2);
List<PartitionOwnership> requestedOwnerships = Collections.singletonList(partitionOwnership2);
List<PartitionOwnership> claimedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block();
assertNotNull(claimedOwnerships);
assertEquals(1, claimedOwnerships.size());
PartitionOwnership claimedOwnership = claimedOwnerships.get(0);
assertClaimedOwnership(partitionOwnership2, claimedOwnership);
assertStoredOwnerships(partitionOwnership1, convertToTestable(claimedOwnership));
assertStoredCheckpoints(checkpoint1, checkpoint2);
}
@Test
void testStealOwnership() throws IOException {
addToState(partitionOwnership1);
addToState(checkpoint1);
PartitionOwnership newOwnership = copy(partitionOwnership1)
.setOwnerId(CLIENT_ID_2);
List<PartitionOwnership> requestedOwnerships = Collections.singletonList(newOwnership);
List<PartitionOwnership> claimedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block();
assertNotNull(claimedOwnerships);
assertEquals(1, claimedOwnerships.size());
PartitionOwnership claimedOwnership = claimedOwnerships.get(0);
assertClaimedOwnership(newOwnership, claimedOwnership);
assertStoredOwnerships(convertToTestable(claimedOwnership));
assertStoredCheckpoints(checkpoint1);
}
@Test
void testClaimMultipleOwnerships() {
partitionOwnership2.setOwnerId(CLIENT_ID_1);
List<PartitionOwnership> requestedOwnerships = Arrays.asList(partitionOwnership1, partitionOwnership2);
List<PartitionOwnership> claimedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block();
assertNotNull(claimedOwnerships);
assertEquals(2, claimedOwnerships.size());
PartitionOwnership claimedOwnership1;
PartitionOwnership claimedOwnership2;
if (claimedOwnerships.get(0).getPartitionId().equals(PARTITION_ID_1)) {
claimedOwnership1 = claimedOwnerships.get(0);
claimedOwnership2 = claimedOwnerships.get(1);
} else {
claimedOwnership1 = claimedOwnerships.get(1);
claimedOwnership2 = claimedOwnerships.get(2);
}
assertClaimedOwnership(partitionOwnership1, claimedOwnership1);
assertClaimedOwnership(partitionOwnership2, claimedOwnership2);
assertStoredOwnerships(convertToTestable(claimedOwnership1), convertToTestable(claimedOwnership2));
}
@Test
void testClaimOwnershipUnsuccessful() {
List<PartitionOwnership> requestedOwnershipsA = Collections.singletonList(partitionOwnership1);
List<PartitionOwnership> requestedOwnershipsB = Collections.singletonList(partitionOwnership1);
List<PartitionOwnership> claimedOwnershipsA = checkpointStore.claimOwnership(requestedOwnershipsA).collectList().block();
List<PartitionOwnership> claimedOwnershipsB = checkpointStore.claimOwnership(requestedOwnershipsB).collectList().block();
assertNotNull(claimedOwnershipsB);
assertEquals(0, claimedOwnershipsB.size());
assertStoredOwnerships(convertToTestable(claimedOwnershipsA.get(0)));
}
@Test
void testClaimMultipleOwnershipsPartialSuccess() {
List<PartitionOwnership> requestedOwnershipsA = Collections.singletonList(partitionOwnership1);
List<PartitionOwnership> requestedOwnershipsB = Arrays.asList(partitionOwnership1, partitionOwnership2);
List<PartitionOwnership> claimedOwnershipsA = checkpointStore.claimOwnership(requestedOwnershipsA).collectList().block();
List<PartitionOwnership> claimedOwnershipsB = checkpointStore.claimOwnership(requestedOwnershipsB).collectList().block();
assertNotNull(claimedOwnershipsB);
assertEquals(1, claimedOwnershipsB.size());
assertStoredOwnerships(convertToTestable(claimedOwnershipsA.get(0)), convertToTestable(claimedOwnershipsB.get(0)));
}
@Test
void testUnclaimOwnership() throws IOException {
addToState(partitionOwnership1);
addToState(checkpoint1);
PartitionOwnership ownershipToUnclaim = copy(partitionOwnership1)
.setOwnerId("");
List<PartitionOwnership> requestedOwnerships = Collections.singletonList(ownershipToUnclaim);
List<PartitionOwnership> returnedOwnerships = checkpointStore.claimOwnership(requestedOwnerships).collectList().block();
assertNotNull(returnedOwnerships);
assertEquals(1, returnedOwnerships.size());
PartitionOwnership unclaimedOwnership = returnedOwnerships.get(0);
assertClaimedOwnership(ownershipToUnclaim, unclaimedOwnership);
assertStoredOwnerships(convertToTestable(unclaimedOwnership));
assertStoredCheckpoints(checkpoint1);
}
@ParameterizedTest
@EnumSource(ExecutionMode.class)
void testNewCheckpoint(ExecutionMode executionMode) throws IOException {
setExecutionMode(executionMode);
addToState(partitionOwnership1);
addToState(partitionOwnership2);
addToState(checkpoint1);
Checkpoint newCheckpoint = createCheckpoint(PARTITION_ID_2, 20L, 2L);
checkpointStore.updateCheckpoint(newCheckpoint).block();
assertStoredCheckpoints(checkpoint1, newCheckpoint);
assertStoredOwnerships(partitionOwnership1, partitionOwnership2);
assertLocalScope(executionMode);
}
@Test
void testUpdatedCheckpoint() throws IOException {
addToState(partitionOwnership1);
addToState(partitionOwnership2);
addToState(checkpoint1);
addToState(checkpoint2);
Checkpoint updatedCheckpoint = copy(checkpoint2)
.setOffset(20L)
.setSequenceNumber(2L);
checkpointStore.updateCheckpoint(updatedCheckpoint).block();
assertStoredCheckpoints(checkpoint1, updatedCheckpoint);
assertStoredOwnerships(partitionOwnership1, partitionOwnership2);
}
@Test
void testCheckpointWithNullOffset() {
checkpoint1.setOffset(null);
checkpointStore.updateCheckpoint(checkpoint1).block();
assertStoredCheckpoints(checkpoint1);
}
@Test
void testCheckpointWithNullSequenceNumber() {
checkpoint1.setSequenceNumber(null);
checkpointStore.updateCheckpoint(checkpoint1).block();
assertStoredCheckpoints(checkpoint1);
}
@Test
void testDisconnectedNode() {
stateManager.setExecutionMode(ExecutionMode.STANDALONE);
assertThrows(ClusterNodeDisconnectedException.class, () -> checkpointStore.claimOwnership(Collections.singletonList(partitionOwnership1)).collectList().block());
assertThrows(ClusterNodeDisconnectedException.class, () -> checkpointStore.updateCheckpoint(checkpoint1).block());
}
private void setExecutionMode(ExecutionMode executionMode) throws IOException {
stateManager.setExecutionMode(executionMode);
addToState(CLUSTERED.key(), Boolean.toString(executionMode == ExecutionMode.CLUSTERED), Scope.LOCAL);
}
private void testListOwnerships(PartitionOwnership... expectedPartitionOwnerships) {
List<PartitionOwnership> partitionOwnerships = checkpointStore.listOwnership(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP).collectList().block();
assertNotNull(partitionOwnerships);
assertEquals(expectedPartitionOwnerships.length, partitionOwnerships.size());
assertThat(convertToTestablePartitionOwnerships(partitionOwnerships), containsInAnyOrder(expectedPartitionOwnerships));
}
private void testListCheckpoints(Checkpoint... expectedCheckpoints) {
List<Checkpoint> checkpoints = checkpointStore.listCheckpoints(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP).collectList().block();
assertNotNull(checkpoints);
assertEquals(expectedCheckpoints.length, checkpoints.size());
assertThat(convertToTestableCheckpoints(checkpoints), containsInAnyOrder(expectedCheckpoints));
}
private void assertStoredOwnerships(PartitionOwnership... expectedPartitionOwnerships) {
testListOwnerships(expectedPartitionOwnerships);
}
private void assertStoredCheckpoints(Checkpoint... expectedCheckpoints) {
testListCheckpoints(expectedCheckpoints);
}
private void assertLocalScope(ExecutionMode executionMode) {
stateManager.assertStateEquals(CLIENT_ID.key(), CLIENT_ID_1, Scope.LOCAL);
stateManager.assertStateEquals(CLUSTERED.key(), Boolean.toString(executionMode == ExecutionMode.CLUSTERED), Scope.LOCAL);
}
private void addToState(PartitionOwnership partitionOwnership) throws IOException {
setETagAndLastModified(partitionOwnership);
addToState(createOwnershipKey(partitionOwnership), createOwnershipValue(partitionOwnership), Scope.CLUSTER);
}
private void addToState(Checkpoint checkpoint) throws IOException {
addToState(createCheckpointKey(checkpoint), createCheckpointValue(checkpoint), Scope.CLUSTER);
}
private void addToState(String key, String value, Scope scope) throws IOException {
Map<String, String> map = new HashMap<>(stateManager.getState(scope).toMap());
map.put(key, value);
stateManager.setState(map, scope);
}
private void initStateWithAllItems() throws IOException {
addToState(partitionOwnership1);
addToState(partitionOwnership2);
addToState(checkpoint1);
addToState(checkpoint2);
addToState(createPartitionOwnership(EVENT_HUB_NAMESPACE + "-2", EVENT_HUB_NAME, CONSUMER_GROUP, PARTITION_ID_1, CLIENT_ID_1));
addToState(createPartitionOwnership(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME + "-2", CONSUMER_GROUP, PARTITION_ID_1, CLIENT_ID_1));
addToState(createPartitionOwnership(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2", PARTITION_ID_1, CLIENT_ID_1));
addToState(createCheckpoint(EVENT_HUB_NAMESPACE + "-2", EVENT_HUB_NAME, CONSUMER_GROUP, PARTITION_ID_1, OFFSET, SEQUENCE_NUMBER));
addToState(createCheckpoint(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME + "-2", CONSUMER_GROUP, PARTITION_ID_1, OFFSET, SEQUENCE_NUMBER));
addToState(createCheckpoint(EVENT_HUB_NAMESPACE, EVENT_HUB_NAME, CONSUMER_GROUP + "-2", PARTITION_ID_1, OFFSET, SEQUENCE_NUMBER));
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.eventhub.checkpoint;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import org.junit.jupiter.api.Test;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertCheckpoint;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertOwnership;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.convertPartitionContext;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointKey;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createCheckpointValue;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipKey;
import static org.apache.nifi.processors.azure.eventhub.checkpoint.ComponentStateCheckpointStoreUtils.createOwnershipValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class ComponentStateCheckpointStoreUtilsTest extends AbstractCheckpointStoreTest {
private static final String OWNERSHIP_KEY = "ownership/my-event-hub-namespace/my-event-hub-name/my-consumer-group/1";
private static final String OWNERSHIP_VALUE = "client-id-1/1234567890/my-etag";
private static final String CHECKPOINT_KEY = "checkpoint/my-event-hub-namespace/my-event-hub-name/my-consumer-group/1";
private static final String CHECKPOINT_VALUE = "10/1";
@Test
void testConvertOwnership() {
PartitionOwnership partitionOwnership = convertOwnership(OWNERSHIP_KEY, OWNERSHIP_VALUE);
assertNotNull(partitionOwnership);
assertEquals(EVENT_HUB_NAMESPACE, partitionOwnership.getFullyQualifiedNamespace());
assertEquals(EVENT_HUB_NAME, partitionOwnership.getEventHubName());
assertEquals(CONSUMER_GROUP, partitionOwnership.getConsumerGroup());
assertEquals(PARTITION_ID_1, partitionOwnership.getPartitionId());
assertEquals(CLIENT_ID_1, partitionOwnership.getOwnerId());
assertEquals(LAST_MODIFIED_TIME, partitionOwnership.getLastModifiedTime());
assertEquals(ETAG, partitionOwnership.getETag());
}
@Test
void testConvertCheckpoint() {
Checkpoint checkpoint = convertCheckpoint(CHECKPOINT_KEY, CHECKPOINT_VALUE);
assertNotNull(checkpoint);
assertEquals(EVENT_HUB_NAMESPACE, checkpoint.getFullyQualifiedNamespace());
assertEquals(EVENT_HUB_NAME, checkpoint.getEventHubName());
assertEquals(CONSUMER_GROUP, checkpoint.getConsumerGroup());
assertEquals(PARTITION_ID_1, checkpoint.getPartitionId());
assertEquals(OFFSET, checkpoint.getOffset());
assertEquals(SEQUENCE_NUMBER, checkpoint.getSequenceNumber());
}
@Test
void testConvertPartitionContextFromOwnershipKey() {
PartitionContext partitionContext = convertPartitionContext(OWNERSHIP_KEY);
assertNotNull(partitionContext);
assertEquals(EVENT_HUB_NAMESPACE, partitionContext.getFullyQualifiedNamespace());
assertEquals(EVENT_HUB_NAME, partitionContext.getEventHubName());
assertEquals(CONSUMER_GROUP, partitionContext.getConsumerGroup());
assertEquals(PARTITION_ID_1, partitionContext.getPartitionId());
}
@Test
void testConvertPartitionContextFromCheckpointKey() {
PartitionContext partitionContext = convertPartitionContext(CHECKPOINT_KEY);
assertNotNull(partitionContext);
assertEquals(EVENT_HUB_NAMESPACE, partitionContext.getFullyQualifiedNamespace());
assertEquals(EVENT_HUB_NAME, partitionContext.getEventHubName());
assertEquals(CONSUMER_GROUP, partitionContext.getConsumerGroup());
assertEquals(PARTITION_ID_1, partitionContext.getPartitionId());
}
@Test
void testOwnershipKey() {
String ownershipKey = createOwnershipKey(partitionOwnership1);
assertEquals(OWNERSHIP_KEY, ownershipKey);
}
@Test
void testOwnershipValue() {
partitionOwnership1
.setLastModifiedTime(LAST_MODIFIED_TIME)
.setETag(ETAG);
String ownershipValue = createOwnershipValue(partitionOwnership1);
assertEquals(OWNERSHIP_VALUE, ownershipValue);
}
@Test
void testCheckpointKey() {
String checkpointKey = createCheckpointKey(checkpoint1);
assertEquals(CHECKPOINT_KEY, checkpointKey);
}
@Test
void testCheckpointValue() {
String checkpointValue = createCheckpointValue(checkpoint1);
assertEquals(CHECKPOINT_VALUE, checkpointValue);
}
}