NIFI-1963 Allows a node reconnecting to the cluster to inherit non-fingerprinted processor settings

Forces a node reconnecting to a cluster to serialize the updated flow to disk
Added most processor settings to the flow fingerprint (excluding name, style, comment, position, and schedule state)
Updated some test data for FingerprintFactoryTest to test for new fields added to the flow fingerprint
Updated StandardProcessorNode to allow processor comments and name to be settable while a processor is running
Updated StandardFlowSynchronizer to inherit non-fingerprinted processor settings (name, style, comment, and position) when flow is already synchronized
This closes #1812
This commit is contained in:
Jeff Storck 2017-05-15 17:21:11 -04:00 committed by Matt Gilman
parent 289dde098e
commit 8e1c79eaaf
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
7 changed files with 80 additions and 24 deletions

View File

@ -643,7 +643,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream()
.collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status)));
controller.resumeHeartbeats(); // we are now connected, so resume sending heartbeats.
// reconnected, this node needs to explicitly write the inherited flow to disk, and resume heartbeats
saveFlowChanges();
controller.resumeHeartbeats();
logger.info("Node reconnected.");
} catch (final Exception ex) {

View File

@ -725,6 +725,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final ProcessorDTO dto = FlowFromDOMFactory.getProcessor(processorElement, encryptor);
final ProcessorNode procNode = processGroup.getProcessor(dto.getId());
updateNonFingerprintedProcessorSettings(procNode, dto);
if (!procNode.getScheduledState().name().equals(dto.getState())) {
try {
switch (ScheduledState.valueOf(dto.getState())) {
@ -964,15 +966,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller)
throws ProcessorInstantiationException {
final ProcessorConfigDTO config = processorDTO.getConfig();
procNode.setPosition(toPosition(processorDTO.getPosition()));
procNode.setName(processorDTO.getName());
procNode.setStyle(processorDTO.getStyle());
procNode.setProcessGroup(processGroup);
procNode.setComments(config.getComments());
procNode.setLossTolerant(config.isLossTolerant());
procNode.setPenalizationPeriod(config.getPenaltyDuration());
procNode.setYieldPeriod(config.getYieldDuration());
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
updateNonFingerprintedProcessorSettings(procNode, processorDTO);
if (config.getSchedulingStrategy() != null) {
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
@ -1011,6 +1010,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
}
}
private void updateNonFingerprintedProcessorSettings(final ProcessorNode procNode, final ProcessorDTO processorDTO) {
procNode.setName(processorDTO.getName());
procNode.setPosition(toPosition(processorDTO.getPosition()));
procNode.setStyle(processorDTO.getStyle());
procNode.setComments(processorDTO.getConfig().getComments());
}
private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException {
// get the parent group ID

View File

@ -256,9 +256,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
*/
@Override
public synchronized void setComments(final String comments) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments));
}
@ -405,9 +402,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public synchronized void setName(final String name) {
if (isRunning()) {
throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
}
super.setName(name);
}

View File

@ -341,6 +341,25 @@ public class FingerprintFactory {
final BundleDTO bundle = FlowFromDOMFactory.getBundle(DomUtils.getChild(processorElem, "bundle"));
addBundleFingerprint(builder, bundle);
// max concurrent tasks
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "maxConcurrentTasks"));
// scheduling period
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "schedulingPeriod"));
// penalization period
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "penalizationPeriod"));
// yield period
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "yieldPeriod"));
// bulletin level
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "bulletinLevel"));
// loss tolerant
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "lossTolerant"));
// scheduling strategy
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "schedulingStrategy"));
// execution node
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "executionNode"));
// run duration nanos
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "runDurationNanos"));
// get the temp instance of the Processor so that we know the default property values
final BundleCoordinate coordinate = getCoordinate(className, bundle);
final ConfigurableComponent configurableComponent = ExtensionManager.getTempComponent(className, coordinate);

View File

@ -25,13 +25,24 @@
<id>d89ada5d-35fb-44ff-83f1-4cc00b48b2df</id>
<name>GenerateFlowFile</name>
<position x="0.0" y="0.0"/>
<style>processor</style>
<styles/>
<comment/>
<class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-standard-nar</artifact>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<maxConcurrentTasks>1</maxConcurrentTasks>
<schedulingPeriod>0 s</schedulingPeriod>
<schedulingPeriod>0 sec</schedulingPeriod>
<penalizationPeriod>30 sec</penalizationPeriod>
<yieldPeriod>1 sec</yieldPeriod>
<bulletinLevel>WARN</bulletinLevel>
<lossTolerant>false</lossTolerant>
<running>false</running>
<scheduledState>RUNNING</scheduledState>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<executionNode>ALL</executionNode>
<runDurationNanos>0</runDurationNanos>
<property>
<name>file.size</name>
<value>5</value>

View File

@ -23,15 +23,28 @@
<comment/>
<processor>
<id>d89ada5d-35fb-44ff-83f1-4cc00b48b2df</id>
<name>GenerateFlowFile</name>
<position x="0.0" y="0.0"/>
<style>processor</style>
<comment/>
<name>GenerateFlowFile1</name>
<position x="0.0" y="1.0"/>
<styles>
<style name="background-color">#00ff00</style>
</styles>
<comment>this is a comment</comment>
<class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-standard-nar</artifact>
<version>1.3.0-SNAPSHOT</version>
</bundle>
<maxConcurrentTasks>1</maxConcurrentTasks>
<schedulingPeriod>0 s</schedulingPeriod>
<schedulingPeriod>0 sec</schedulingPeriod>
<penalizationPeriod>30 sec</penalizationPeriod>
<yieldPeriod>1 sec</yieldPeriod>
<bulletinLevel>WARN</bulletinLevel>
<lossTolerant>false</lossTolerant>
<running>false</running>
<scheduledState>DISABLED</scheduledState>
<schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
<executionNode>ALL</executionNode>
<runDurationNanos>0</runDurationNanos>
<property>
<name>file.size</name>
<value>5</value>
@ -45,7 +58,7 @@
<style>processor</style>
<comment/>
<class>org.apache.nifi.processors.standard.LogAttribute</class>
<maxConcurrentTasks>10</maxConcurrentTasks>
<maxConcurrentTasks>1</maxConcurrentTasks>
<schedulingPeriod>0 s</schedulingPeriod>
<lossTolerant>false</lossTolerant>
<running>false</running>

View File

@ -25,13 +25,24 @@
<id>d89ada5d-35fb-44ff-83f1-4cc00b48b2dd</id>
<name>GenerateFlowFile</name>
<position x="0.0" y="0.0"/>
<style>processor</style>
<styles/>
<comment/>
<class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
<bundle>
<group>org.apache.nifi</group>
<artifact>nifi-standard-nar</artifact>
<version>1.4.0-SNAPSHOT</version>
</bundle>
<maxConcurrentTasks>1</maxConcurrentTasks>
<schedulingPeriod>0 s</schedulingPeriod>
<lossTolerant>false</lossTolerant>
<running>false</running>
<penalizationPeriod>30 s</penalizationPeriod>
<yieldPeriod>1 s</yieldPeriod>
<bulletinLevel>ERROR</bulletinLevel>
<lossTolerant>true</lossTolerant>
<scheduledState>RUNNING</scheduledState>
<schedulingStrategy>CRON_DRIVEN</schedulingStrategy>
<executionNode>PRIMARY</executionNode>
<runDurationNanos>1</runDurationNanos>
<property>
<name>file.size</name>
<value>5</value>