NIFI-6879: Added system tests for variables. Fixed bug that resulted in Variable Registry not being updated if a Processor in a child group referenced it

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3949.
This commit is contained in:
Mark Payne 2019-12-20 13:58:39 -05:00 committed by Pierre Villard
parent 7f49c6b76a
commit 5f0f801e46
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
7 changed files with 392 additions and 29 deletions

View File

@ -48,6 +48,7 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration; import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template; import org.apache.nifi.controller.Template;
@ -3065,9 +3066,24 @@ public final class StandardProcessGroup implements ProcessGroup {
final boolean overridden = childRegistry.getVariableMap().containsKey(descriptor); final boolean overridden = childRegistry.getVariableMap().containsKey(descriptor);
if (!overridden) { if (!overridden) {
final Set<ComponentNode> affectedComponents = childGroup.getComponentsAffectedByVariable(variableName); final Set<ComponentNode> affectedComponents = childGroup.getComponentsAffectedByVariable(variableName);
if (!affectedComponents.isEmpty()) {
throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponents.size() + " components that are " + for (final ComponentNode affectedComponent : affectedComponents) {
"currently running."); if (affectedComponent instanceof ProcessorNode) {
final ProcessorNode affectedProcessor = (ProcessorNode) affectedComponent;
if (affectedProcessor.isRunning()) {
throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently running.");
}
} else if (affectedComponent instanceof ControllerServiceNode) {
final ControllerServiceNode affectedService = (ControllerServiceNode) affectedComponent;
if (affectedService.isActive()) {
throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently active.");
}
} else if (affectedComponent instanceof ReportingTaskNode) {
final ReportingTaskNode affectedReportingTask = (ReportingTaskNode) affectedComponent;
if (affectedReportingTask.isRunning()) {
throw new IllegalStateException("Cannot update variable '" + variableName + "' because it is referenced by " + affectedComponent + ", which is currently running.");
}
}
} }
} }
} }

View File

@ -31,7 +31,7 @@ import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.CounterDTO; import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersSnapshotDTO; import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO; import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
import org.apache.nifi.web.api.dto.NodeDTO; import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ParameterContextDTO; import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.dto.ParameterContextReferenceDTO; import org.apache.nifi.web.api.dto.ParameterContextReferenceDTO;
@ -42,6 +42,8 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO; import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.VariableDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO; import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO; import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
@ -53,6 +55,8 @@ import org.apache.nifi.web.api.entity.ControllerServiceRunStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.CountersEntity; import org.apache.nifi.web.api.entity.CountersEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import org.apache.nifi.web.api.entity.NodeEntity; import org.apache.nifi.web.api.entity.NodeEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity; import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity; import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
@ -64,7 +68,12 @@ import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity; import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.ScheduleComponentsEntity; import org.apache.nifi.web.api.entity.ScheduleComponentsEntity;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.api.entity.VariableRegistryEntity;
import org.apache.nifi.web.api.entity.VariableRegistryUpdateRequestEntity;
import org.junit.Assert; import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -77,6 +86,8 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class NiFiClientUtil { public class NiFiClientUtil {
private static final Logger logger = LoggerFactory.getLogger(NiFiClientUtil.class);
private final NiFiClient nifiClient; private final NiFiClient nifiClient;
private final String nifiVersion; private final String nifiVersion;
@ -89,12 +100,21 @@ public class NiFiClientUtil {
return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion); return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
} }
public ProcessorEntity createProcessor(final String type, final String groupId, final String artifactId, final String version) throws NiFiClientException, IOException { public ProcessorEntity createProcessor(final String simpleTypeName, final String groupId) throws NiFiClientException, IOException {
return createProcessor(NiFiSystemIT.TEST_PROCESSORS_PACKAGE + "." + simpleTypeName, groupId, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
}
public ProcessorEntity createProcessor(final String type, final String bundleGroupId, final String artifactId, final String version) throws NiFiClientException, IOException {
return createProcessor(type, "root", bundleGroupId, artifactId, version);
}
public ProcessorEntity createProcessor(final String type, final String processGroupId, final String bundleGroupId, final String artifactId, final String version)
throws NiFiClientException, IOException {
final ProcessorDTO dto = new ProcessorDTO(); final ProcessorDTO dto = new ProcessorDTO();
dto.setType(type); dto.setType(type);
final BundleDTO bundle = new BundleDTO(); final BundleDTO bundle = new BundleDTO();
bundle.setGroup(groupId); bundle.setGroup(bundleGroupId);
bundle.setArtifact(artifactId); bundle.setArtifact(artifactId);
bundle.setVersion(version); bundle.setVersion(version);
dto.setBundle(bundle); dto.setBundle(bundle);
@ -103,7 +123,7 @@ public class NiFiClientUtil {
entity.setComponent(dto); entity.setComponent(dto);
entity.setRevision(createNewRevision()); entity.setRevision(createNewRevision());
return nifiClient.getProcessorClient().createProcessor("root", entity); return nifiClient.getProcessorClient().createProcessor(processGroupId, entity);
} }
public ControllerServiceEntity createControllerService(final String simpleTypeName) throws NiFiClientException, IOException { public ControllerServiceEntity createControllerService(final String simpleTypeName) throws NiFiClientException, IOException {
@ -153,7 +173,7 @@ public class NiFiClientUtil {
return entity; return entity;
} }
private RevisionDTO createNewRevision() { public RevisionDTO createNewRevision() {
final RevisionDTO revisionDto = new RevisionDTO(); final RevisionDTO revisionDto = new RevisionDTO();
revisionDto.setClientId(getClass().getName()); revisionDto.setClientId(getClass().getName());
revisionDto.setVersion(0L); revisionDto.setVersion(0L);
@ -277,9 +297,8 @@ public class NiFiClientUtil {
return; return;
} }
if ("Validating".equals(validationStatus)) { if ("Invalid".equalsIgnoreCase(validationStatus)) {
Thread.sleep(100L); logger.info("Processor with ID {} is currently invalid due to: {}", processorId, entity.getComponent().getValidationErrors());
continue;
} }
Thread.sleep(100L); Thread.sleep(100L);
@ -378,23 +397,7 @@ public class NiFiClientUtil {
} }
for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) { for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) {
waitForProcessorsStopped(group.getComponent()); waitForProcessorsStopped(group.getId());
}
}
private void waitForProcessorsStopped(final ProcessGroupDTO group) throws IOException, NiFiClientException {
final FlowSnippetDTO groupContents = group.getContents();
for (final ProcessorDTO processor : groupContents.getProcessors()) {
try {
waitForStoppedProcessor(processor.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new NiFiClientException("Interrupted while waiting for Processor with ID " + processor.getId() + " to stop");
}
}
for (final ProcessGroupDTO child : groupContents.getProcessGroups()) {
waitForProcessorsStopped(child);
} }
} }
@ -659,4 +662,90 @@ public class NiFiClientUtil {
return nifiClient.getControllerClient().disconnectNode(nodeId, nodeEntity); return nifiClient.getControllerClient().disconnectNode(nodeId, nodeEntity);
} }
public ListingRequestEntity performQueueListing(final String connectionId) throws NiFiClientException, IOException {
try {
ListingRequestEntity listingRequestEntity = nifiClient.getConnectionClient().listQueue(connectionId);
while (listingRequestEntity.getListingRequest().getFinished() != Boolean.TRUE) {
Thread.sleep(10L);
listingRequestEntity = nifiClient.getConnectionClient().getListingRequest(connectionId, listingRequestEntity.getListingRequest().getId());
}
// Delete the listing. Return the previously obtained listing, not the one from the deletion, because the listing request that is returned from deleting the listing does not contain the
// FlowFile Summaries.
nifiClient.getConnectionClient().deleteListingRequest(connectionId, listingRequestEntity.getListingRequest().getId());
return listingRequestEntity;
} catch (final InterruptedException e) {
Assert.fail("Failed to obtain connection status");
return null;
}
}
public FlowFileEntity getQueueFlowFile(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
final ListingRequestEntity listing = performQueueListing(connectionId);
final List<FlowFileSummaryDTO> flowFileSummaries = listing.getListingRequest().getFlowFileSummaries();
if (flowFileIndex >= flowFileSummaries.size()) {
throw new IllegalArgumentException("Cannot retrieve FlowFile with index " + flowFileIndex + " because queue only has " + flowFileSummaries.size() + " FlowFiles");
}
final FlowFileSummaryDTO flowFileSummary = flowFileSummaries.get(flowFileIndex);
final String uuid = flowFileSummary.getUuid();
return nifiClient.getConnectionClient().getFlowFile(connectionId, uuid);
}
public VariableRegistryUpdateRequestEntity updateVariableRegistry(final ProcessGroupEntity processGroup, final Map<String, String> variables) throws NiFiClientException, IOException {
final Set<VariableEntity> variableEntities = new HashSet<>();
for (final Map.Entry<String, String> entry : variables.entrySet()) {
final VariableEntity entity = new VariableEntity();
variableEntities.add(entity);
final VariableDTO dto = new VariableDTO();
dto.setName(entry.getKey());
dto.setValue(entry.getValue());
dto.setProcessGroupId(processGroup.getId());
entity.setVariable(dto);
}
final VariableRegistryDTO variableRegistryDto = new VariableRegistryDTO();
variableRegistryDto.setProcessGroupId(processGroup.getId());
variableRegistryDto.setVariables(variableEntities);
final VariableRegistryEntity registryEntity = new VariableRegistryEntity();
registryEntity.setProcessGroupRevision(processGroup.getRevision());
registryEntity.setVariableRegistry(variableRegistryDto);
VariableRegistryUpdateRequestEntity updateRequestEntity = nifiClient.getProcessGroupClient().updateVariableRegistry(processGroup.getId(), registryEntity);
while (!updateRequestEntity.getRequest().isComplete()) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
Assert.fail("Interrupted while waiting for variable registry to update");
return null;
}
updateRequestEntity = nifiClient.getProcessGroupClient().getVariableRegistryUpdateRequest(processGroup.getId(), updateRequestEntity.getRequest().getRequestId());
}
if (updateRequestEntity.getRequest().getFailureReason() != null) {
Assert.fail("Failed to update Variable Registry due to: " + updateRequestEntity.getRequest().getFailureReason());
}
nifiClient.getProcessGroupClient().deleteVariableRegistryUpdateRequest(processGroup.getId(), updateRequestEntity.getRequest().getRequestId());
return updateRequestEntity;
}
public ProcessGroupEntity createProcessGroup(final String name, final String parentGroupId) throws NiFiClientException, IOException {
final ProcessGroupDTO component = new ProcessGroupDTO();
component.setName(name);
component.setParentGroupId(parentGroupId);
final ProcessGroupEntity childGroupEntity = new ProcessGroupEntity();
childGroupEntity.setRevision(createNewRevision());
childGroupEntity.setComponent(component);
final ProcessGroupEntity childGroup = nifiClient.getProcessGroupClient().createProcessGroup(parentGroupId, childGroupEntity);
return childGroup;
}
} }

View File

@ -22,8 +22,10 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
import org.apache.nifi.web.api.entity.ClusteSummaryEntity; import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.TestName; import org.junit.rules.TestName;
@ -235,4 +237,25 @@ public abstract class NiFiSystemIT {
Thread.sleep(10L); Thread.sleep(10L);
} }
} }
protected void waitForQueueCount(final String connectionId, final int queueSize) throws InterruptedException {
waitFor(() -> {
final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId);
return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == queueSize;
});
}
private ConnectionStatusEntity getConnectionStatus(final String connectionId) {
try {
return getNifiClient().getFlowClient().getConnectionStatus(connectionId, true);
} catch (final Exception e) {
Assert.fail("Failed to obtain connection status");
return null;
}
}
protected int getConnectionQueueSize(final String connectionId) {
final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId);
return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued();
}
} }

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.variables;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
public class ProcessGroupVariablesIT extends NiFiSystemIT {
@Test
public void testChangeVariableWhileProcessorRunning() throws NiFiClientException, IOException, InterruptedException {
// Add variable abc=123 to the root group
ProcessGroupEntity rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root");
getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "123"));
// Create GenerateFlowFile that uses Variable to create FlowFile attribute. Connect to CountEvents processor.
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Value", "Hello ${abc}"));
getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 mins");
final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents");
final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, countEvents, "success");
getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
// Wait for processor to be valid
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
waitForQueueCount(connection.getId(), 1);
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
assertEquals("Hello 123", flowFile.getFlowFile().getAttributes().get("Value"));
getClientUtil().emptyQueue(connection.getId());
waitForQueueCount(connection.getId(), 0);
rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root");
getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "xyz"));
waitForQueueCount(connection.getId(), 1);
final FlowFileEntity secondFlowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
assertEquals("Hello xyz", secondFlowFile.getFlowFile().getAttributes().get("Value"));
}
@Test
public void testChangeHigherLevelVariableWhileProcessorRunning() throws NiFiClientException, IOException, InterruptedException {
// Add variable abc=123 to the root group
ProcessGroupEntity rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root");
getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "123"));
final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child", "root");
// Create GenerateFlowFile that uses Variable to create FlowFile attribute. Connect to CountEvents processor.
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId());
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Value", "Hello ${abc}"));
getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 mins");
final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents", childGroup.getId());
final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, countEvents, "success");
getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
// Wait for processor to be valid
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
waitForQueueCount(connection.getId(), 1);
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
assertEquals("Hello 123", flowFile.getFlowFile().getAttributes().get("Value"));
getClientUtil().emptyQueue(connection.getId());
waitForQueueCount(connection.getId(), 0);
rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root");
getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "xyz"));
waitForQueueCount(connection.getId(), 1);
final FlowFileEntity secondFlowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
assertEquals("Hello xyz", secondFlowFile.getFlowFile().getAttributes().get("Value"));
}
@Test
public void testChangeVariableThatIsOverridden() throws NiFiClientException, IOException, InterruptedException {
// Add variable abc=123 to the root group
ProcessGroupEntity rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root");
getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "123"));
final ProcessGroupEntity childGroup = getClientUtil().createProcessGroup("child", "root");
getClientUtil().updateVariableRegistry(childGroup, Collections.singletonMap("abc", "123"));
// Create GenerateFlowFile that uses Variable to create FlowFile attribute. Connect to CountEvents processor.
final ProcessorEntity generateFlowFile = getClientUtil().createProcessor("GenerateFlowFile", childGroup.getId());
getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Value", "Hello ${abc}"));
getClientUtil().updateProcessorSchedulingPeriod(generateFlowFile, "10 mins");
final ProcessorEntity countEvents = getClientUtil().createProcessor("CountEvents", childGroup.getId());
final ConnectionEntity connection = getClientUtil().createConnection(generateFlowFile, countEvents, "success");
getClientUtil().setAutoTerminatedRelationships(countEvents, "success");
// Wait for processor to be valid
getClientUtil().waitForValidProcessor(generateFlowFile.getId());
// Start Processor, wait for 1 FlowFile to be queued up, then stop processor
getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
waitForQueueCount(connection.getId(), 1);
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
assertEquals("Hello 123", flowFile.getFlowFile().getAttributes().get("Value"));
getClientUtil().emptyQueue(connection.getId());
waitForQueueCount(connection.getId(), 0);
rootGroup = getNifiClient().getProcessGroupClient().getProcessGroup("root");
getClientUtil().updateVariableRegistry(rootGroup, Collections.singletonMap("abc", "xyz"));
// Wait a bit and ensure that the queue is still empty.
Thread.sleep(2000L);
assertEquals(0, getConnectionQueueSize(connection.getId()));
}
}

View File

@ -28,6 +28,6 @@ java.arg.3=-Xmx128m
java.arg.14=-Djava.awt.headless=true java.arg.14=-Djava.awt.headless=true
#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002 java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8002
nifi.bootstrap.sensitive.key= nifi.bootstrap.sensitive.key=

View File

@ -18,6 +18,8 @@ package org.apache.nifi.toolkit.cli.impl.client.nifi;
import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import java.io.IOException; import java.io.IOException;
@ -37,4 +39,12 @@ public interface ConnectionClient {
DropRequestEntity getDropRequest(String connectionId, String dropRequestId) throws NiFiClientException, IOException; DropRequestEntity getDropRequest(String connectionId, String dropRequestId) throws NiFiClientException, IOException;
DropRequestEntity deleteDropRequest(String connectionId, String dropRequestId) throws NiFiClientException, IOException; DropRequestEntity deleteDropRequest(String connectionId, String dropRequestId) throws NiFiClientException, IOException;
ListingRequestEntity listQueue(String connectionId) throws NiFiClientException, IOException;
ListingRequestEntity getListingRequest(String connectionId, String listingRequestId) throws NiFiClientException, IOException;
ListingRequestEntity deleteListingRequest(String connectionId, String listingRequestId) throws NiFiClientException, IOException;
FlowFileEntity getFlowFile(String connectionId, String flowFileUuid) throws NiFiClientException, IOException;
} }

View File

@ -21,6 +21,8 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity; import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.FlowFileEntity;
import org.apache.nifi.web.api.entity.ListingRequestEntity;
import javax.ws.rs.client.Entity; import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget; import javax.ws.rs.client.WebTarget;
@ -184,4 +186,78 @@ public class JerseyConnectionClient extends AbstractJerseyClient implements Conn
}); });
} }
@Override
public ListingRequestEntity listQueue(final String connectionId) throws NiFiClientException, IOException {
if (connectionId == null) {
throw new IllegalArgumentException("Connection ID cannot be null");
}
return executeAction("Error listing queue for Connection", () -> {
final WebTarget target = flowFileQueueTarget
.path("listing-requests")
.resolveTemplate("id", connectionId);
return getRequestBuilder(target).post(
Entity.entity(connectionId, MediaType.TEXT_PLAIN),
ListingRequestEntity.class
);
});
}
@Override
public ListingRequestEntity getListingRequest(final String connectionId, final String listingRequestId) throws NiFiClientException, IOException {
if (connectionId == null) {
throw new IllegalArgumentException("Connection ID cannot be null");
}
if (listingRequestId == null) {
throw new IllegalArgumentException("Listing Request ID cannot be null");
}
return executeAction("Error retrieving Listing Request", () -> {
final WebTarget target = flowFileQueueTarget
.path("listing-requests/{requestId}")
.resolveTemplate("id", connectionId)
.resolveTemplate("requestId", listingRequestId);
return getRequestBuilder(target).get(ListingRequestEntity.class);
});
}
@Override
public ListingRequestEntity deleteListingRequest(final String connectionId, final String listingRequestId) throws NiFiClientException, IOException {
if (connectionId == null) {
throw new IllegalArgumentException("Connection ID cannot be null");
}
if (listingRequestId == null) {
throw new IllegalArgumentException("Listing Request ID cannot be null");
}
return executeAction("Error retrieving Listing Request", () -> {
final WebTarget target = flowFileQueueTarget
.path("listing-requests/{requestId}")
.resolveTemplate("id", connectionId)
.resolveTemplate("requestId", listingRequestId);
return getRequestBuilder(target).delete(ListingRequestEntity.class);
});
}
@Override
public FlowFileEntity getFlowFile(final String connectionId, final String flowFileUuid) throws NiFiClientException, IOException {
if (connectionId == null) {
throw new IllegalArgumentException("Connection ID cannot be null");
}
if (flowFileUuid == null) {
throw new IllegalArgumentException("FlowFile UUID cannot be null");
}
return executeAction("Error retrieving FlowFile", () -> {
final WebTarget target = flowFileQueueTarget
.path("flowfiles/{uuid}")
.resolveTemplate("id", connectionId)
.resolveTemplate("uuid", flowFileUuid);
return getRequestBuilder(target).get(FlowFileEntity.class);
});
}
} }