NIFI-13803 Improved Clustered Flow Loading and Synchronization Process

- Only synchronize flow with the coordinator after getting cluster response

This closes #9317

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Bryan Bende 2024-09-26 11:42:24 -04:00 committed by exceptionfactory
parent 6761de3901
commit dffa88aaae
No known key found for this signature in database
2 changed files with 64 additions and 41 deletions

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.asset.AssetSynchronizer;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.AuthorizerCapabilityDetection;
@ -53,7 +52,6 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.VersionedFlowSynchronizer;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
@ -450,30 +448,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
logger.trace("InitialFlow = {}", new String(initialFlow.getFlow(), StandardCharsets.UTF_8));
}
// Sync the initial flow into the flow controller so that if the flow came from disk we loaded the
// whole flow into the flow controller and applied any bundle upgrades
writeLock.lock();
try {
loadFromBytes(initialFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_COMPATIBLE_OR_GHOST);
} finally {
writeLock.unlock();
}
// Get the proposed flow by serializing the flow controller which now has the synced version from above
final DataFlow proposedFlow = createDataFlowFromController();
if (logger.isTraceEnabled()) {
logger.trace("ProposedFlow = {}", new String(proposedFlow.getFlow(), StandardCharsets.UTF_8));
}
/*
* Attempt to connect to the cluster. If the manager is able to
* provide a data flow, then the manager will send a connection
* response. If the manager was unable to be located, then
* the response will be null and we should load the local dataflow
* and heartbeat until a manager is located.
*/
final boolean localFlowEmpty = VersionedFlowSynchronizer.isFlowEmpty(proposedFlow);
final ConnectionResponse response = connect(true, localFlowEmpty, proposedFlow);
final ConnectionResponse response = connect(true, true, initialFlow);
// obtain write lock while we are updating the controller. We need to ensure that we don't
// obtain the lock before calling connect(), though, or we will end up getting a deadlock
@ -503,21 +478,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
*/
controller.startHeartbeating();
// Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good
initializeController();
// notify controller that flow is initialized
try {
controller.onFlowInitialized(autoResumeState);
} catch (final Exception ex) {
logger.warn("Unable to start all processors due to invalid flow configuration.");
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, ex);
}
}
} else {
try {
loadFromConnectionResponse(response);
dao.save(controller, true);
} catch (final Exception e) {
logger.error("Failed to load flow from cluster", e);
handleConnectionFailure(e);
@ -525,9 +489,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
}
// save the flow in the controller so we write out the latest flow with any updated bundles to disk
dao.save(controller, true);
} finally {
writeLock.unlock();
}
@ -993,6 +954,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
loadSnippets(dataFlow.getSnippets());
controller.startHeartbeating();
} catch (final UninheritableFlowException ufe) {
throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX, ufe);
} catch (final MissingBundleException mbe) {

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.clustering;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
import java.io.IOException;
public class AutoResumeStateClusteredIT extends NiFiSystemIT {
@Override
public NiFiInstanceFactory getInstanceFactory() {
return createTwoNodeInstanceFactory();
}
@Test
public void testRestartWithAutoResumeStateFalse() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success");
getClientUtil().waitForValidProcessor(generate.getId());
getClientUtil().startProcessor(terminate);
getClientUtil().waitForRunningProcessor(terminate.getId());
getClientUtil().startProcessor(generate);
getClientUtil().waitForRunningProcessor(generate.getId());
getNiFiInstance().stop();
getNiFiInstance().getNodeInstance(1).setProperty(NiFiProperties.AUTO_RESUME_STATE, "false");
getNiFiInstance().getNodeInstance(2).setProperty(NiFiProperties.AUTO_RESUME_STATE, "false");
getNiFiInstance().start(true);
waitForAllNodesConnected();
getClientUtil().waitForStoppedProcessor(terminate.getId());
getClientUtil().waitForStoppedProcessor(generate.getId());
}
}