mirror of https://github.com/apache/nifi.git
NIFI-3900: Ensure that when we serialize a flow to send to Cluster Coordinator that we include the Scheduled State for processors as they are intended to be, not as they are currently because during startup they may not have been started yet.
NIFI-3900: Rebased against master and updated new unit test to use new method signature for FlowSerializer.serialize This closes #1804. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
ac8e57259f
commit
494a0e8928
|
@ -138,6 +138,7 @@ import org.apache.nifi.controller.serialization.FlowSerializationException;
|
||||||
import org.apache.nifi.controller.serialization.FlowSerializer;
|
import org.apache.nifi.controller.serialization.FlowSerializer;
|
||||||
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
|
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
|
||||||
import org.apache.nifi.controller.serialization.FlowSynchronizer;
|
import org.apache.nifi.controller.serialization.FlowSynchronizer;
|
||||||
|
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
|
import org.apache.nifi.controller.service.ControllerServiceInvocationHandler;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceNode;
|
import org.apache.nifi.controller.service.ControllerServiceNode;
|
||||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||||
|
@ -1509,7 +1510,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException {
|
public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
serializer.serialize(this, os);
|
final ScheduledStateLookup scheduledStateLookup = procNode -> startConnectablesAfterInitialization.contains(procNode) ? ScheduledState.RUNNING : procNode.getScheduledState();
|
||||||
|
serializer.serialize(this, os, scheduledStateLookup);
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -2932,6 +2934,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||||
throw new IllegalStateException("Cannot find ProcessorNode with ID " + processorId + " within ProcessGroup with ID " + parentGroupId);
|
throw new IllegalStateException("Cannot find ProcessorNode with ID " + processorId + " within ProcessGroup with ID " + parentGroupId);
|
||||||
}
|
}
|
||||||
group.stopProcessor(node);
|
group.stopProcessor(node);
|
||||||
|
// If we are ready to start the processor upon initialization of the controller, don't.
|
||||||
|
startConnectablesAfterInitialization.remove(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopAllProcessors() {
|
public void stopAllProcessors() {
|
||||||
|
|
|
@ -1006,6 +1006,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
||||||
controller.startProcessor(processGroup.getIdentifier(), procNode.getIdentifier());
|
controller.startProcessor(processGroup.getIdentifier(), procNode.getIdentifier());
|
||||||
} else if (ScheduledState.DISABLED.equals(scheduledState)) {
|
} else if (ScheduledState.DISABLED.equals(scheduledState)) {
|
||||||
processGroup.disableProcessor(procNode);
|
processGroup.disableProcessor(procNode);
|
||||||
|
} else if (ScheduledState.STOPPED.equals(scheduledState)) {
|
||||||
|
controller.stopProcessor(processGroup.getIdentifier(), procNode.getIdentifier());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,9 +34,10 @@ public interface FlowSerializer {
|
||||||
*
|
*
|
||||||
* @param controller a controller
|
* @param controller a controller
|
||||||
* @param os an output stream to write the configuration to
|
* @param os an output stream to write the configuration to
|
||||||
|
* @param stateLookup a lookup that can be used to determine the ScheduledState of a Processor
|
||||||
*
|
*
|
||||||
* @throws FlowSerializationException if serialization failed
|
* @throws FlowSerializationException if serialization failed
|
||||||
*/
|
*/
|
||||||
void serialize(FlowController controller, OutputStream os) throws FlowSerializationException;
|
void serialize(FlowController controller, OutputStream os, ScheduledStateLookup stateLookup) throws FlowSerializationException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.controller.serialization;
|
||||||
|
|
||||||
|
import org.apache.nifi.controller.ProcessorNode;
|
||||||
|
import org.apache.nifi.controller.ScheduledState;
|
||||||
|
|
||||||
|
public interface ScheduledStateLookup {
|
||||||
|
|
||||||
|
ScheduledState getScheduledState(ProcessorNode procNode);
|
||||||
|
|
||||||
|
public static final ScheduledStateLookup IDENTITY_LOOKUP = ProcessorNode::getScheduledState;
|
||||||
|
}
|
|
@ -79,7 +79,7 @@ public class StandardFlowSerializer implements FlowSerializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serialize(final FlowController controller, final OutputStream os) throws FlowSerializationException {
|
public void serialize(final FlowController controller, final OutputStream os, final ScheduledStateLookup scheduledStateLookup) throws FlowSerializationException {
|
||||||
try {
|
try {
|
||||||
// create a new, empty document
|
// create a new, empty document
|
||||||
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
|
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
|
||||||
|
@ -94,7 +94,7 @@ public class StandardFlowSerializer implements FlowSerializer {
|
||||||
doc.appendChild(rootNode);
|
doc.appendChild(rootNode);
|
||||||
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
|
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
|
||||||
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
|
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
|
||||||
addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup");
|
addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup);
|
||||||
|
|
||||||
// Add root-level controller services
|
// Add root-level controller services
|
||||||
final Element controllerServicesNode = doc.createElement("controllerServices");
|
final Element controllerServicesNode = doc.createElement("controllerServices");
|
||||||
|
@ -144,7 +144,7 @@ public class StandardFlowSerializer implements FlowSerializer {
|
||||||
parentElement.appendChild(element);
|
parentElement.appendChild(element);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName) {
|
private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
|
||||||
final Document doc = parentElement.getOwnerDocument();
|
final Document doc = parentElement.getOwnerDocument();
|
||||||
final Element element = doc.createElement(elementName);
|
final Element element = doc.createElement(elementName);
|
||||||
parentElement.appendChild(element);
|
parentElement.appendChild(element);
|
||||||
|
@ -154,7 +154,7 @@ public class StandardFlowSerializer implements FlowSerializer {
|
||||||
addTextElement(element, "comment", group.getComments());
|
addTextElement(element, "comment", group.getComments());
|
||||||
|
|
||||||
for (final ProcessorNode processor : group.getProcessors()) {
|
for (final ProcessorNode processor : group.getProcessors()) {
|
||||||
addProcessor(element, processor);
|
addProcessor(element, processor, scheduledStateLookup);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (group.isRootGroup()) {
|
if (group.isRootGroup()) {
|
||||||
|
@ -184,7 +184,7 @@ public class StandardFlowSerializer implements FlowSerializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final ProcessGroup childGroup : group.getProcessGroups()) {
|
for (final ProcessGroup childGroup : group.getProcessGroups()) {
|
||||||
addProcessGroup(element, childGroup, "processGroup");
|
addProcessGroup(element, childGroup, "processGroup", scheduledStateLookup);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
|
for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
|
||||||
|
@ -363,7 +363,7 @@ public class StandardFlowSerializer implements FlowSerializer {
|
||||||
parentElement.appendChild(element);
|
parentElement.appendChild(element);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addProcessor(final Element parentElement, final ProcessorNode processor) {
|
private void addProcessor(final Element parentElement, final ProcessorNode processor, final ScheduledStateLookup scheduledStateLookup) {
|
||||||
final Document doc = parentElement.getOwnerDocument();
|
final Document doc = parentElement.getOwnerDocument();
|
||||||
final Element element = doc.createElement("processor");
|
final Element element = doc.createElement("processor");
|
||||||
parentElement.appendChild(element);
|
parentElement.appendChild(element);
|
||||||
|
@ -384,7 +384,7 @@ public class StandardFlowSerializer implements FlowSerializer {
|
||||||
addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
|
addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
|
||||||
addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
|
addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
|
||||||
addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
|
addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
|
||||||
addTextElement(element, "scheduledState", processor.getScheduledState().name());
|
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(processor).name());
|
||||||
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
|
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
|
||||||
addTextElement(element, "executionNode", processor.getExecutionNode().name());
|
addTextElement(element, "executionNode", processor.getExecutionNode().name());
|
||||||
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
|
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow;
|
||||||
import org.apache.nifi.controller.repository.FlowFileEventRepository;
|
import org.apache.nifi.controller.repository.FlowFileEventRepository;
|
||||||
import org.apache.nifi.controller.serialization.FlowSerializationException;
|
import org.apache.nifi.controller.serialization.FlowSerializationException;
|
||||||
import org.apache.nifi.controller.serialization.FlowSerializer;
|
import org.apache.nifi.controller.serialization.FlowSerializer;
|
||||||
|
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
|
||||||
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
|
import org.apache.nifi.controller.serialization.StandardFlowSerializer;
|
||||||
import org.apache.nifi.encrypt.StringEncryptor;
|
import org.apache.nifi.encrypt.StringEncryptor;
|
||||||
import org.apache.nifi.events.VolatileBulletinRepository;
|
import org.apache.nifi.events.VolatileBulletinRepository;
|
||||||
|
@ -96,7 +97,7 @@ public class StandardFlowServiceTest {
|
||||||
|
|
||||||
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
|
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
serializer.serialize(flowController, baos);
|
serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP);
|
||||||
|
|
||||||
String expectedFlow = new String(flowBytes).trim();
|
String expectedFlow = new String(flowBytes).trim();
|
||||||
String actualFlow = new String(baos.toByteArray()).trim();
|
String actualFlow = new String(baos.toByteArray()).trim();
|
||||||
|
@ -120,7 +121,7 @@ public class StandardFlowServiceTest {
|
||||||
|
|
||||||
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
|
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
serializer.serialize(flowController, baos);
|
serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP);
|
||||||
|
|
||||||
String expectedFlow = new String(flowBytes).trim();
|
String expectedFlow = new String(flowBytes).trim();
|
||||||
String actualFlow = new String(baos.toByteArray()).trim();
|
String actualFlow = new String(baos.toByteArray()).trim();
|
||||||
|
@ -140,7 +141,7 @@ public class StandardFlowServiceTest {
|
||||||
|
|
||||||
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
|
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
serializer.serialize(flowController, baos);
|
serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP);
|
||||||
|
|
||||||
String expectedFlow = new String(originalBytes).trim();
|
String expectedFlow = new String(originalBytes).trim();
|
||||||
String actualFlow = new String(baos.toByteArray()).trim();
|
String actualFlow = new String(baos.toByteArray()).trim();
|
||||||
|
@ -162,7 +163,7 @@ public class StandardFlowServiceTest {
|
||||||
|
|
||||||
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
|
FlowSerializer serializer = new StandardFlowSerializer(mockEncryptor);
|
||||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||||
serializer.serialize(flowController, baos);
|
serializer.serialize(flowController, baos, ScheduledStateLookup.IDENTITY_LOOKUP);
|
||||||
|
|
||||||
String expectedFlow = new String(originalBytes).trim();
|
String expectedFlow = new String(originalBytes).trim();
|
||||||
String actualFlow = new String(baos.toByteArray()).trim();
|
String actualFlow = new String(baos.toByteArray()).trim();
|
||||||
|
|
|
@ -92,7 +92,7 @@ public class StandardFlowSerializerTest {
|
||||||
|
|
||||||
// serialize the controller
|
// serialize the controller
|
||||||
final ByteArrayOutputStream os = new ByteArrayOutputStream();
|
final ByteArrayOutputStream os = new ByteArrayOutputStream();
|
||||||
serializer.serialize(controller, os);
|
serializer.serialize(controller, os, ScheduledStateLookup.IDENTITY_LOOKUP);
|
||||||
|
|
||||||
// verify the results contain the serialized string
|
// verify the results contain the serialized string
|
||||||
final String serializedFlow = os.toString(StandardCharsets.UTF_8.name());
|
final String serializedFlow = os.toString(StandardCharsets.UTF_8.name());
|
||||||
|
|
Loading…
Reference in New Issue