NIFI-9393 Set Port Scheduled State for Flow Definitions

- Set Scheduled State for Versioned Port and Versioned Remote Port when mapping Flow Definition
- Updated StandardProcessGroup to set disable Port based on Scheduled State of DISABLED
- Updated StandardProcessGroup to set Remote Port transmitting based on Scheduled State of ENABLED

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #5534.
This commit is contained in:
exceptionfactory 2021-11-18 14:04:17 -06:00 committed by Nathan Gough
parent 839fbf7d19
commit 0e09750a4d
3 changed files with 22 additions and 2 deletions

View File

@ -4931,6 +4931,9 @@ public final class StandardProcessGroup implements ProcessGroup {
port.setName(name);
port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
if (org.apache.nifi.flow.ScheduledState.DISABLED == proposed.getScheduledState()) {
port.disable();
}
}
private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
@ -5186,6 +5189,10 @@ public final class StandardProcessGroup implements ProcessGroup {
descriptor.setId(generateUuid(proposed.getIdentifier(), rpgId, componentIdSeed));
descriptor.setName(proposed.getName());
descriptor.setUseCompression(proposed.isUseCompression());
final boolean transmitting = org.apache.nifi.flow.ScheduledState.ENABLED == proposed.getScheduledState();
descriptor.setTransmitting(transmitting);
return descriptor;
}

View File

@ -581,6 +581,7 @@ public class NiFiRegistryFlowMapper {
versionedPort.setName(port.getName());
versionedPort.setPosition(mapPosition(port.getPosition()));
versionedPort.setType(PortType.valueOf(port.getConnectableType().name()));
versionedPort.setScheduledState(mapScheduledState(port.getScheduledState()));
if (port instanceof PublicPort) {
versionedPort.setAllowRemoteAccess(true);
@ -621,8 +622,7 @@ public class NiFiRegistryFlowMapper {
processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name());
processor.setStyle(procNode.getStyle());
processor.setYieldDuration(procNode.getYieldPeriod());
processor.setScheduledState(procNode.getScheduledState() == ScheduledState.DISABLED ? org.apache.nifi.flow.ScheduledState.DISABLED
: org.apache.nifi.flow.ScheduledState.ENABLED);
processor.setScheduledState(mapScheduledState(procNode.getScheduledState()));
return processor;
}
@ -664,6 +664,7 @@ public class NiFiRegistryFlowMapper {
port.setBatchSize(mapBatchSettings(remotePort));
port.setTargetId(remotePort.getTargetIdentifier());
port.setComponentType(componentType);
port.setScheduledState(mapScheduledState(remotePort.getScheduledState()));
return port;
}
@ -730,4 +731,10 @@ public class NiFiRegistryFlowMapper {
versionedParameter.setValue(descriptor.isSensitive() ? null : parameter.getValue());
return versionedParameter;
}
private org.apache.nifi.flow.ScheduledState mapScheduledState(final ScheduledState scheduledState) {
return scheduledState == ScheduledState.DISABLED
? org.apache.nifi.flow.ScheduledState.DISABLED
: org.apache.nifi.flow.ScheduledState.ENABLED;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.PropertyConfiguration;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
@ -436,6 +437,7 @@ public class NiFiRegistryFlowMapperTest {
prepareComponentAuthorizable(port, processGroupId);
preparePositionable(port);
prepareConnectable(port, ConnectableType.valueOf(portType.name()));
when(port.getScheduledState()).thenReturn(ScheduledState.RUNNING);
return port;
}
@ -532,6 +534,7 @@ public class NiFiRegistryFlowMapperTest {
prepareComponentAuthorizable(remoteGroupPort, remoteProcessGroup.getIdentifier());
when(remoteGroupPort.getName()).thenReturn("remotePort" + (counter++));
when(remoteGroupPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup);
when(remoteGroupPort.getScheduledState()).thenReturn(ScheduledState.DISABLED);
return remoteGroupPort;
}
@ -751,6 +754,7 @@ public class NiFiRegistryFlowMapperTest {
assertEquals(port.getPosition().getY(), versionedPort.getPosition().getY(), 0);
assertEquals(port.getName(), versionedPort.getName());
assertEquals(portType, versionedPort.getType());
assertEquals(org.apache.nifi.flow.ScheduledState.ENABLED, versionedPort.getScheduledState());
}
}
@ -767,6 +771,8 @@ public class NiFiRegistryFlowMapperTest {
assertEquals(expectedPortGroupIdentifier, versionedRemotePort.getGroupIdentifier());
assertEquals(remotePort.getName(), versionedRemotePort.getName());
assertEquals(componentType, versionedRemotePort.getComponentType());
assertNotNull(versionedRemotePort.getScheduledState());
assertEquals(remotePort.getScheduledState().name(), versionedRemotePort.getScheduledState().name());
}
}