NIFI-2493 This closes #798. Do not fingerprint Remote Ports' running state. When synchronizing remote flow with local flow, start/stop remote group ports as appropriate based on the inherited flow

This commit is contained in:
Mark Payne 2016-08-05 15:52:45 -04:00 committed by joewitt
parent 8752d11f18
commit ed14bf22e7
2 changed files with 51 additions and 2 deletions

View File

@ -772,6 +772,56 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
} }
} }
// Update scheduled state of Remote Group Ports
final List<Element> remoteProcessGroupList = getChildrenByTagName(processGroupElement, "remoteProcessGroup");
for (final Element remoteGroupElement : remoteProcessGroupList) {
final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteGroupElement, encryptor);
final RemoteProcessGroup rpg = processGroup.getRemoteProcessGroup(remoteGroupDto.getId());
// input ports
final List<Element> inputPortElements = getChildrenByTagName(remoteGroupElement, "inputPort");
for (final Element inputPortElement : inputPortElements) {
final RemoteProcessGroupPortDescriptor portDescriptor = FlowFromDOMFactory.getRemoteProcessGroupPort(inputPortElement);
final String inputPortId = portDescriptor.getId();
final RemoteGroupPort inputPort = rpg.getInputPort(inputPortId);
if (inputPort == null) {
continue;
}
if (portDescriptor.isTransmitting()) {
if (inputPort.getScheduledState() != ScheduledState.RUNNING && inputPort.getScheduledState() != ScheduledState.STARTING) {
rpg.startTransmitting(inputPort);
}
} else {
if (inputPort.getScheduledState() != ScheduledState.STOPPED && inputPort.getScheduledState() != ScheduledState.STOPPING) {
rpg.stopTransmitting(inputPort);
}
}
}
// output ports
final List<Element> outputPortElements = getChildrenByTagName(remoteGroupElement, "outputPort");
for (final Element outputPortElement : outputPortElements) {
final RemoteProcessGroupPortDescriptor portDescriptor = FlowFromDOMFactory.getRemoteProcessGroupPort(outputPortElement);
final String outputPortId = portDescriptor.getId();
final RemoteGroupPort outputPort = rpg.getOutputPort(outputPortId);
if (outputPort == null) {
continue;
}
if (portDescriptor.isTransmitting()) {
if (outputPort.getScheduledState() != ScheduledState.RUNNING && outputPort.getScheduledState() != ScheduledState.STARTING) {
rpg.startTransmitting(outputPort);
}
} else {
if (outputPort.getScheduledState() != ScheduledState.STOPPED && outputPort.getScheduledState() != ScheduledState.STOPPING) {
rpg.stopTransmitting(outputPort);
}
}
}
}
// add labels // add labels
final List<Element> labelNodeList = getChildrenByTagName(processGroupElement, "label"); final List<Element> labelNodeList = getChildrenByTagName(processGroupElement, "label");
for (final Element labelElement : labelNodeList) { for (final Element labelElement : labelNodeList) {

View File

@ -778,7 +778,7 @@ public final class FingerprintFactory {
} }
private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) { private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) {
for (final String childName : new String[]{"id", "scheduledState", "maxConcurrentTasks", "useCompression"}) { for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression"}) {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName)); appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName));
} }
@ -787,7 +787,6 @@ public final class FingerprintFactory {
private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final RemoteProcessGroupPortDTO port) { private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final RemoteProcessGroupPortDTO port) {
builder.append(port.getId()); builder.append(port.getId());
builder.append(Boolean.TRUE.equals(port.isTransmitting()) ? "RUNNING" : "STOPPED");
builder.append(port.getConcurrentlySchedulableTaskCount()); builder.append(port.getConcurrentlySchedulableTaskCount());
builder.append(port.getUseCompression()); builder.append(port.getUseCompression());
return builder; return builder;