diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index ed83159d3e..3d90b6fee9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -43,6 +43,8 @@ import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; +import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; @@ -57,9 +59,9 @@ import org.apache.nifi.cluster.manager.exception.UnknownNodeException; import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.events.EventReporter; -import org.apache.nifi.logging.NiFiLog; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +74,7 @@ import com.sun.jersey.core.util.MultivaluedMapImpl; public class ThreadPoolRequestReplicator implements RequestReplicator { - private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ThreadPoolRequestReplicator.class)); + private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class); private static final int MAX_CONCURRENT_REQUESTS = 100; private final Client client; // the client to use for issuing requests @@ -210,6 +212,15 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final Map updatedHeaders = new HashMap<>(headers); updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, UUID.randomUUID().toString()); updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true"); + + // If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request, + // it knows that we are acting as a proxy on behalf of the current user. + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user != null && !user.isAnonymous()) { + final String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user); + updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain); + } + return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 6fd9204ea3..e2bdcf0224 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -558,25 +558,24 @@ public class StandardFlowService implements FlowService, ProtocolHandler { @Override public StandardDataFlow createDataFlow() throws IOException { - // Load the flow from disk - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - dao.load(baos); - final byte[] bytes = baos.toByteArray(); final byte[] snippetBytes = controller.getSnippetManager().export(); final byte[] authorizerFingerprint = getAuthorizerFingerprint(); - final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint); - // Check if the flow from disk is empty. If not, use it. - if (!StandardFlowSynchronizer.isEmpty(fromDisk, encryptor)) { + // Load the flow from disk if the file exists. + if (dao.isFlowPresent()) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + dao.load(baos); + final byte[] bytes = baos.toByteArray(); + final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint); return fromDisk; } - // Flow from disk is empty, so serialize the Flow Controller and use that. + // Flow from disk does not exist, so serialize the Flow Controller and use that. // This is done because on startup, if there is no flow, the Flow Controller // will automatically create a Root Process Group, and we need to ensure that // we replicate that Process Group to all nodes in the cluster, so that they all // end up with the same ID for the root Process Group. - baos.reset(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); dao.save(controller, baos); final byte[] flowBytes = baos.toByteArray(); baos.reset(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java index c5c1b3b8ea..4593b2f13e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/FlowConfigurationDAO.java @@ -32,6 +32,11 @@ import org.apache.nifi.controller.serialization.FlowSynchronizationException; */ public interface FlowConfigurationDAO { + /** + * @return true if a file containing the flow is present, false otherwise + */ + boolean isFlowPresent(); + /** * Loads the given controller with the values from the given proposed flow. If loading the proposed flow configuration would cause the controller to orphan flow files, then an * UninheritableFlowException is thrown. diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java index 2d408e8737..ffe212d4de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java @@ -67,6 +67,13 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD this.encryptor = encryptor; } + + @Override + public boolean isFlowPresent() { + final File flowXmlFile = flowXmlPath.toFile(); + return flowXmlFile.exists() && flowXmlFile.length() > 0; + } + @Override public synchronized void load(final FlowController controller, final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException { @@ -78,8 +85,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD @Override public synchronized void load(final OutputStream os) throws IOException { - final File file = flowXmlPath.toFile(); - if (!file.exists() || file.length() == 0) { + if (!isFlowPresent()) { return; }