mirror of https://github.com/apache/nifi.git
NIFI-6025: Include Processor 'scheduled state' (i.e., Enabled or Disabled) in the VersionedProcessor when pushing to Flow Registry and take into account when updating flows on the NiFi side
NIFI-6025: Include difference in Scheduled State as a Local Flow Difference This closes #3546. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
01503b5187
commit
4ae1fec78a
|
@ -458,6 +458,8 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
onComponentModified();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -477,6 +479,8 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
||||
onComponentModified();
|
||||
}
|
||||
|
||||
private StateManager getStateManager(final String componentId) {
|
||||
|
@ -4544,6 +4548,12 @@ public final class StandardProcessGroup implements ProcessGroup {
|
|||
processor.setYieldPeriod(proposed.getYieldDuration());
|
||||
processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
|
||||
|
||||
if (proposed.getScheduledState() == org.apache.nifi.registry.flow.ScheduledState.DISABLED) {
|
||||
disableProcessor(processor);
|
||||
} else if (processor.getScheduledState() == ScheduledState.DISABLED) {
|
||||
enableProcessor(processor);
|
||||
}
|
||||
|
||||
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
|
||||
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.nifi.connectable.Port;
|
|||
import org.apache.nifi.controller.ComponentNode;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.ScheduledState;
|
||||
import org.apache.nifi.controller.PropertyConfiguration;
|
||||
import org.apache.nifi.controller.label.Label;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
|
@ -544,6 +545,8 @@ public class NiFiRegistryFlowMapper {
|
|||
processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name());
|
||||
processor.setStyle(procNode.getStyle());
|
||||
processor.setYieldDuration(procNode.getYieldPeriod());
|
||||
processor.setScheduledState(procNode.getScheduledState() == ScheduledState.DISABLED ? org.apache.nifi.registry.flow.ScheduledState.DISABLED
|
||||
: org.apache.nifi.registry.flow.ScheduledState.ENABLED);
|
||||
|
||||
return processor;
|
||||
}
|
||||
|
|
|
@ -1214,7 +1214,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
} catch (final ResumeFlowException rfe) {
|
||||
// Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
|
||||
// since in this case the flow was successfully updated - we just couldn't re-enable the components.
|
||||
logger.error(rfe.getMessage(), rfe);
|
||||
logger.warn(rfe.getMessage(), rfe);
|
||||
vcur.fail(rfe.getMessage());
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to update flow to new version", e);
|
||||
|
@ -1415,7 +1415,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
} catch (final ResumeFlowException rfe) {
|
||||
// Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
|
||||
// since in this case the flow was successfully updated - we just couldn't re-enable the components.
|
||||
logger.error(rfe.getMessage(), rfe);
|
||||
logger.warn(rfe.getMessage(), rfe);
|
||||
vcur.fail(rfe.getMessage());
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to update flow to new version", e);
|
||||
|
@ -1586,7 +1586,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
} catch (final IllegalStateException ise) {
|
||||
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
|
||||
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
|
||||
throw new ResumeFlowException("Failed to re-enable Controller Services because " + ise.getMessage(), ise);
|
||||
throw new ResumeFlowException("Successfully updated flow but could not re-enable all Controller Services because " + ise.getMessage(), ise);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1638,7 +1638,7 @@ public class VersionsResource extends ApplicationResource {
|
|||
} catch (final IllegalStateException ise) {
|
||||
// Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
|
||||
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
|
||||
throw new ResumeFlowException("Failed to restart components because " + ise.getMessage(), ise);
|
||||
throw new ResumeFlowException("Successfully updated flow but could not restart all Processors because " + ise.getMessage(), ise);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue