From 5f0f801e4600d12e05466bd19abbfc9d745c7994 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 20 Dec 2019 13:58:39 -0500 Subject: [PATCH] NIFI-6879: Added system tests for variables. Fixed bug that resulted in Variable Registry not being updated if a Processor in a child group referenced it Signed-off-by: Pierre Villard This closes #3949. --- .../nifi/groups/StandardProcessGroup.java | 22 ++- .../nifi/tests/system/NiFiClientUtil.java | 139 +++++++++++++--- .../nifi/tests/system/NiFiSystemIT.java | 23 +++ .../variables/ProcessGroupVariablesIT.java | 149 ++++++++++++++++++ .../resources/conf/default/bootstrap.conf | 2 +- .../impl/client/nifi/ConnectionClient.java | 10 ++ .../nifi/impl/JerseyConnectionClient.java | 76 +++++++++ 7 files changed, 392 insertions(+), 29 deletions(-) create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 97064cf840..fc52ad7c7d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -48,6 +48,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.PropertyConfiguration; +import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; @@ -3065,9 +3066,24 @@ public final class StandardProcessGroup implements ProcessGroup { final boolean overridden = childRegistry.getVariableMap().containsKey(descriptor); if (!overridden) { final Set affectedComponents = childGroup.getComponentsAffectedByVariable(variableName); - if (!affectedComponents.isEmpty()) { - throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponents.size() + " components that are " + - "currently running."); + + for (final ComponentNode affectedComponent : affectedComponents) { + if (affectedComponent instanceof ProcessorNode) { + final ProcessorNode affectedProcessor = (ProcessorNode) affectedComponent; + if (affectedProcessor.isRunning()) { + throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently running."); + } + } else if (affectedComponent instanceof ControllerServiceNode) { + final ControllerServiceNode affectedService = (ControllerServiceNode) affectedComponent; + if (affectedService.isActive()) { + throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently active."); + } + } else if (affectedComponent instanceof ReportingTaskNode) { + final ReportingTaskNode affectedReportingTask = (ReportingTaskNode) affectedComponent; + if (affectedReportingTask.isRunning()) { + throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently running."); + } + } } } } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 21f04f31be..03e14c6d58 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -31,7 +31,7 @@ import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CountersSnapshotDTO; -import org.apache.nifi.web.api.dto.FlowSnippetDTO; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.ParameterContextDTO; import org.apache.nifi.web.api.dto.ParameterContextReferenceDTO; @@ -42,6 +42,8 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RevisionDTO; +import org.apache.nifi.web.api.dto.VariableDTO; +import org.apache.nifi.web.api.dto.VariableRegistryDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; @@ -53,6 +55,8 @@ import org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.CountersEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; +import org.apache.nifi.web.api.entity.FlowFileEntity; +import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.ParameterContextEntity; import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity; @@ -64,7 +68,12 @@ import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; +import org.apache.nifi.web.api.entity.VariableEntity; +import org.apache.nifi.web.api.entity.VariableRegistryEntity; +import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; @@ -77,6 +86,8 @@ import java.util.Set; import java.util.stream.Collectors; public class NiFiClientUtil { + private static final Logger logger = LoggerFactory.getLogger(NiFiClientUtil.class); + private final NiFiClient nifiClient; private final String nifiVersion; @@ -89,12 +100,21 @@ public class NiFiClientUtil { return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion); } - public ProcessorEntity createProcessor(final String type, final String groupId, final String artifactId, final String version) throws NiFiClientException, IOException { + public ProcessorEntity createProcessor(final String simpleTypeName, final String groupId) throws NiFiClientException, IOException { + return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + simpleTypeName, groupId, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion); + } + + public ProcessorEntity createProcessor(final String type, final String bundleGroupId, final String artifactId, final String version) throws NiFiClientException, IOException { + return createProcessor(type, "root", bundleGroupId, artifactId, version); + } + + public ProcessorEntity createProcessor(final String type, final String processGroupId, final String bundleGroupId, final String artifactId, final String version) + throws NiFiClientException, IOException { final ProcessorDTO dto = new ProcessorDTO(); dto.setType(type); final BundleDTO bundle = new BundleDTO(); - bundle.setGroup(groupId); + bundle.setGroup(bundleGroupId); bundle.setArtifact(artifactId); bundle.setVersion(version); dto.setBundle(bundle); @@ -103,7 +123,7 @@ public class NiFiClientUtil { entity.setComponent(dto); entity.setRevision(createNewRevision()); - return nifiClient.getProcessorClient().createProcessor("root", entity); + return nifiClient.getProcessorClient().createProcessor(processGroupId, entity); } public ControllerServiceEntity createControllerService(final String simpleTypeName) throws NiFiClientException, IOException { @@ -153,7 +173,7 @@ public class NiFiClientUtil { return entity; } - private RevisionDTO createNewRevision() { + public RevisionDTO createNewRevision() { final RevisionDTO revisionDto = new RevisionDTO(); revisionDto.setClientId(getClass().getName()); revisionDto.setVersion(0L); @@ -277,9 +297,8 @@ public class NiFiClientUtil { return; } - if ("Validating".equals(validationStatus)) { - Thread.sleep(100L); - continue; + if ("Invalid".equalsIgnoreCase(validationStatus)) { + logger.info("Processor with ID {} is currently invalid due to: {}", processorId, entity.getComponent().getValidationErrors()); } Thread.sleep(100L); @@ -378,23 +397,7 @@ public class NiFiClientUtil { } for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) { - waitForProcessorsStopped(group.getComponent()); - } - } - - private void waitForProcessorsStopped(final ProcessGroupDTO group) throws IOException, NiFiClientException { - final FlowSnippetDTO groupContents = group.getContents(); - for (final ProcessorDTO processor : groupContents.getProcessors()) { - try { - waitForStoppedProcessor(processor.getId()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new NiFiClientException("Interrupted while waiting for Processor with ID " + processor.getId() + " to stop"); - } - } - - for (final ProcessGroupDTO child : groupContents.getProcessGroups()) { - waitForProcessorsStopped(child); + waitForProcessorsStopped(group.getId()); } } @@ -659,4 +662,90 @@ public class NiFiClientUtil { return nifiClient.getControllerClient().disconnectNode(nodeId, nodeEntity); } + + public ListingRequestEntity performQueueListing(final String connectionId) throws NiFiClientException, IOException { + try { + ListingRequestEntity listingRequestEntity = nifiClient.getConnectionClient().listQueue(connectionId); + while (listingRequestEntity.getListingRequest().getFinished() != Boolean.TRUE) { + Thread.sleep(10L); + + listingRequestEntity = nifiClient.getConnectionClient().getListingRequest(connectionId, listingRequestEntity.getListingRequest().getId()); + } + + // Delete the listing. Return the previously obtained listing, not the one from the deletion, because the listing request that is returned from deleting the listing does not contain the + // FlowFile Summaries. + nifiClient.getConnectionClient().deleteListingRequest(connectionId, listingRequestEntity.getListingRequest().getId()); + return listingRequestEntity; + } catch (final InterruptedException e) { + Assert.fail("Failed to obtain connection status"); + return null; + } + } + + public FlowFileEntity getQueueFlowFile(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException { + final ListingRequestEntity listing = performQueueListing(connectionId); + final List flowFileSummaries = listing.getListingRequest().getFlowFileSummaries(); + if (flowFileIndex >= flowFileSummaries.size()) { + throw new IllegalArgumentException("Cannot retrieve FlowFile with index " + flowFileIndex + " because queue only has " + flowFileSummaries.size() + " FlowFiles"); + } + + final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex); + final String uuid = flowFileSummary.getUuid(); + + return nifiClient.getConnectionClient().getFlowFile(connectionId, uuid); + } + + public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map variables) throws NiFiClientException, IOException { + final Set variableEntities = new HashSet<>(); + for (final Map.Entry entry : variables.entrySet()) { + final VariableEntity entity = new VariableEntity(); + variableEntities.add(entity); + + final VariableDTO dto = new VariableDTO(); + dto.setName(entry.getKey()); + dto.setValue(entry.getValue()); + dto.setProcessGroupId(processGroup.getId()); + entity.setVariable(dto); + } + + final VariableRegistryDTO variableRegistryDto = new VariableRegistryDTO(); + variableRegistryDto.setProcessGroupId(processGroup.getId()); + variableRegistryDto.setVariables(variableEntities); + + final VariableRegistryEntity registryEntity = new VariableRegistryEntity(); + registryEntity.setProcessGroupRevision(processGroup.getRevision()); + registryEntity.setVariableRegistry(variableRegistryDto); + + VariableRegistryUpdateRequestEntity updateRequestEntity = nifiClient.getProcessGroupClient().updateVariableRegistry(processGroup.getId(), registryEntity); + while (!updateRequestEntity.getRequest().isComplete()) { + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + Assert.fail("Interrupted while waiting for variable registry to update"); + return null; + } + + updateRequestEntity = nifiClient.getProcessGroupClient().getVariableRegistryUpdateRequest(processGroup.getId(), updateRequestEntity.getRequest().getRequestId()); + } + + if (updateRequestEntity.getRequest().getFailureReason() != null) { + Assert.fail("Failed to update Variable Registry due to: " + updateRequestEntity.getRequest().getFailureReason()); + } + + nifiClient.getProcessGroupClient().deleteVariableRegistryUpdateRequest(processGroup.getId(), updateRequestEntity.getRequest().getRequestId()); + return updateRequestEntity; + } + + public ProcessGroupEntity createProcessGroup(final String name, final String parentGroupId) throws NiFiClientException, IOException { + final ProcessGroupDTO component = new ProcessGroupDTO(); + component.setName(name); + component.setParentGroupId(parentGroupId); + + final ProcessGroupEntity childGroupEntity = new ProcessGroupEntity(); + childGroupEntity.setRevision(createNewRevision()); + childGroupEntity.setComponent(component); + + final ProcessGroupEntity childGroup = nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, childGroupEntity); + return childGroup; + } } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index 82a739cd86..23e04ba38a 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -22,8 +22,10 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient; import org.apache.nifi.web.api.entity.ClusteSummaryEntity; +import org.apache.nifi.web.api.entity.ConnectionStatusEntity; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; @@ -235,4 +237,25 @@ public abstract class NiFiSystemIT { Thread.sleep(10L); } } + + protected void waitForQueueCount(final String connectionId, final int queueSize) throws InterruptedException { + waitFor(() -> { + final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId); + return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == queueSize; + }); + } + + private ConnectionStatusEntity getConnectionStatus(final String connectionId) { + try { + return getNifiClient().getFlowClient().getConnectionStatus(connectionId, true); + } catch (final Exception e) { + Assert.fail("Failed to obtain connection status"); + return null; + } + } + + protected int getConnectionQueueSize(final String connectionId) { + final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId); + return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued(); + } } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java new file mode 100644 index 0000000000..908a8a992c --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.variables; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.FlowFileEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + +public class ProcessGroupVariablesIT extends NiFiSystemIT { + + @Test + public void testChangeVariableWhileProcessorRunning() throws NiFiClientException, IOException, InterruptedException { + // Add variable abc=123 to the root group + ProcessGroupEntity rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root"); + getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "123")); + + // Create GenerateFlowFile that uses Variable to create FlowFile attribute. Connect to CountEvents processor. + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile"); + getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Value", "Hello ${abc}")); + getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 mins"); + + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents"); + final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, countEvents, "success"); + getClientUtil().setAutoTerminatedRelationships(countEvents, "success"); + + // Wait for processor to be valid + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + + // Start Processor, wait for 1 FlowFile to be queued up, then stop processor + getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + waitForQueueCount(connection.getId(), 1); + + final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0); + assertEquals("Hello 123", flowFile.getFlowFile().getAttributes().get("Value")); + + getClientUtil().emptyQueue(connection.getId()); + waitForQueueCount(connection.getId(), 0); + + rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root"); + getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "xyz")); + waitForQueueCount(connection.getId(), 1); + + final FlowFileEntity secondFlowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0); + assertEquals("Hello xyz", secondFlowFile.getFlowFile().getAttributes().get("Value")); + } + + + @Test + public void testChangeHigherLevelVariableWhileProcessorRunning() throws NiFiClientException, IOException, InterruptedException { + // Add variable abc=123 to the root group + ProcessGroupEntity rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root"); + getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "123")); + + final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child", "root"); + + // Create GenerateFlowFile that uses Variable to create FlowFile attribute. Connect to CountEvents processor. + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId()); + getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Value", "Hello ${abc}")); + getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 mins"); + + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents", childGroup.getId()); + final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, countEvents, "success"); + getClientUtil().setAutoTerminatedRelationships(countEvents, "success"); + + // Wait for processor to be valid + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + + // Start Processor, wait for 1 FlowFile to be queued up, then stop processor + getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + waitForQueueCount(connection.getId(), 1); + + final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0); + assertEquals("Hello 123", flowFile.getFlowFile().getAttributes().get("Value")); + + getClientUtil().emptyQueue(connection.getId()); + waitForQueueCount(connection.getId(), 0); + + rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root"); + getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "xyz")); + waitForQueueCount(connection.getId(), 1); + + final FlowFileEntity secondFlowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0); + assertEquals("Hello xyz", secondFlowFile.getFlowFile().getAttributes().get("Value")); + } + + + @Test + public void testChangeVariableThatIsOverridden() throws NiFiClientException, IOException, InterruptedException { + // Add variable abc=123 to the root group + ProcessGroupEntity rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root"); + getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "123")); + + final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child", "root"); + getClientUtil().updateVariableRegistry(childGroup, Collections.singletonMap("abc", "123")); + + // Create GenerateFlowFile that uses Variable to create FlowFile attribute. Connect to CountEvents processor. + final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId()); + getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Value", "Hello ${abc}")); + getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 mins"); + + final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents", childGroup.getId()); + final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, countEvents, "success"); + getClientUtil().setAutoTerminatedRelationships(countEvents, "success"); + + // Wait for processor to be valid + getClientUtil().waitForValidProcessor(generateFlowFile.getId()); + + // Start Processor, wait for 1 FlowFile to be queued up, then stop processor + getNifiClient().getProcessorClient().startProcessor(generateFlowFile); + waitForQueueCount(connection.getId(), 1); + + final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0); + assertEquals("Hello 123", flowFile.getFlowFile().getAttributes().get("Value")); + + getClientUtil().emptyQueue(connection.getId()); + waitForQueueCount(connection.getId(), 0); + + rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root"); + getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "xyz")); + + // Wait a bit and ensure that the queue is still empty. + Thread.sleep(2000L); + + assertEquals(0, getConnectionQueueSize(connection.getId())); + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf index 3dc2c298af..a68d11ae94 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/bootstrap.conf @@ -28,6 +28,6 @@ java.arg.3=-Xmx128m java.arg.14=-Djava.awt.headless=true -#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002 +java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002 nifi.bootstrap.sensitive.key= \ No newline at end of file diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java index af5b5e9697..a486a65825 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ConnectionClient.java @@ -18,6 +18,8 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; +import org.apache.nifi.web.api.entity.FlowFileEntity; +import org.apache.nifi.web.api.entity.ListingRequestEntity; import java.io.IOException; @@ -37,4 +39,12 @@ public interface ConnectionClient { DropRequestEntity getDropRequest(String connectionId, String dropRequestId) throws NiFiClientException, IOException; DropRequestEntity deleteDropRequest(String connectionId, String dropRequestId) throws NiFiClientException, IOException; + + ListingRequestEntity listQueue(String connectionId) throws NiFiClientException, IOException; + + ListingRequestEntity getListingRequest(String connectionId, String listingRequestId) throws NiFiClientException, IOException; + + ListingRequestEntity deleteListingRequest(String connectionId, String listingRequestId) throws NiFiClientException, IOException; + + FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) throws NiFiClientException, IOException; } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java index 51ded4e3a9..f4180fee60 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyConnectionClient.java @@ -21,6 +21,8 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.DropRequestEntity; +import org.apache.nifi.web.api.entity.FlowFileEntity; +import org.apache.nifi.web.api.entity.ListingRequestEntity; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; @@ -184,4 +186,78 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn }); } + @Override + public ListingRequestEntity listQueue(final String connectionId) throws NiFiClientException, IOException { + if (connectionId == null) { + throw new IllegalArgumentException("Connection ID cannot be null"); + } + + return executeAction("Error listing queue for Connection", () -> { + final WebTarget target = flowFileQueueTarget + .path("listing-requests") + .resolveTemplate("id", connectionId); + + return getRequestBuilder(target).post( + Entity.entity(connectionId, MediaType.TEXT_PLAIN), + ListingRequestEntity.class + ); + }); + } + + @Override + public ListingRequestEntity getListingRequest(final String connectionId, final String listingRequestId) throws NiFiClientException, IOException { + if (connectionId == null) { + throw new IllegalArgumentException("Connection ID cannot be null"); + } + if (listingRequestId == null) { + throw new IllegalArgumentException("Listing Request ID cannot be null"); + } + + return executeAction("Error retrieving Listing Request", () -> { + final WebTarget target = flowFileQueueTarget + .path("listing-requests/{requestId}") + .resolveTemplate("id", connectionId) + .resolveTemplate("requestId", listingRequestId); + + return getRequestBuilder(target).get(ListingRequestEntity.class); + }); + } + + @Override + public ListingRequestEntity deleteListingRequest(final String connectionId, final String listingRequestId) throws NiFiClientException, IOException { + if (connectionId == null) { + throw new IllegalArgumentException("Connection ID cannot be null"); + } + if (listingRequestId == null) { + throw new IllegalArgumentException("Listing Request ID cannot be null"); + } + + return executeAction("Error retrieving Listing Request", () -> { + final WebTarget target = flowFileQueueTarget + .path("listing-requests/{requestId}") + .resolveTemplate("id", connectionId) + .resolveTemplate("requestId", listingRequestId); + + return getRequestBuilder(target).delete(ListingRequestEntity.class); + }); + } + + @Override + public FlowFileEntity getFlowFile(final String connectionId, final String flowFileUuid) throws NiFiClientException, IOException { + if (connectionId == null) { + throw new IllegalArgumentException("Connection ID cannot be null"); + } + if (flowFileUuid == null) { + throw new IllegalArgumentException("FlowFile UUID cannot be null"); + } + + return executeAction("Error retrieving FlowFile", () -> { + final WebTarget target = flowFileQueueTarget + .path("flowfiles/{uuid}") + .resolveTemplate("id", connectionId) + .resolveTemplate("uuid", flowFileUuid); + + return getRequestBuilder(target).get(FlowFileEntity.class); + }); + } }