NIFI-2059: Ensure that we properly pass along proxied entities in HTTP headers when secure and ensure that we don't keep creating new Root Group ID's once we've created one, even after restart. This closes #572.

This commit is contained in:
Mark Payne 2016-06-23 14:25:11 -04:00 committed by Matt Gilman
parent 4f2643f668
commit fd5327e1b9
4 changed files with 34 additions and 13 deletions

View File

@ -43,6 +43,8 @@ import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
@ -57,9 +59,9 @@ import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -72,7 +74,7 @@ import com.sun.jersey.core.util.MultivaluedMapImpl;
public class ThreadPoolRequestReplicator implements RequestReplicator {
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(ThreadPoolRequestReplicator.class));
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
private static final int MAX_CONCURRENT_REQUESTS = 100;
private final Client client; // the client to use for issuing requests
@ -210,6 +212,15 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
final Map<String, String> updatedHeaders = new HashMap<>(headers);
updatedHeaders.put(RequestReplicator.CLUSTER_ID_GENERATION_SEED_HEADER, UUID.randomUUID().toString());
updatedHeaders.put(RequestReplicator.REPLICATION_INDICATOR_HEADER, "true");
// If the user is authenticated, add them as a proxied entity so that when the receiving NiFi receives the request,
// it knows that we are acting as a proxy on behalf of the current user.
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user != null && !user.isAnonymous()) {
final String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
}
return replicate(nodeIds, method, uri, entity, updatedHeaders, true, null);
}

View File

@ -558,25 +558,24 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
@Override
public StandardDataFlow createDataFlow() throws IOException {
// Load the flow from disk
final byte[] snippetBytes = controller.getSnippetManager().export();
final byte[] authorizerFingerprint = getAuthorizerFingerprint();
// Load the flow from disk if the file exists.
if (dao.isFlowPresent()) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
dao.load(baos);
final byte[] bytes = baos.toByteArray();
final byte[] snippetBytes = controller.getSnippetManager().export();
final byte[] authorizerFingerprint = getAuthorizerFingerprint();
final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint);
// Check if the flow from disk is empty. If not, use it.
if (!StandardFlowSynchronizer.isEmpty(fromDisk, encryptor)) {
return fromDisk;
}
// Flow from disk is empty, so serialize the Flow Controller and use that.
// Flow from disk does not exist, so serialize the Flow Controller and use that.
// This is done because on startup, if there is no flow, the Flow Controller
// will automatically create a Root Process Group, and we need to ensure that
// we replicate that Process Group to all nodes in the cluster, so that they all
// end up with the same ID for the root Process Group.
baos.reset();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
dao.save(controller, baos);
final byte[] flowBytes = baos.toByteArray();
baos.reset();

View File

@ -32,6 +32,11 @@ import org.apache.nifi.controller.serialization.FlowSynchronizationException;
*/
public interface FlowConfigurationDAO {
/**
* @return <code>true</code> if a file containing the flow is present, <code>false</code> otherwise
*/
boolean isFlowPresent();
/**
* Loads the given controller with the values from the given proposed flow. If loading the proposed flow configuration would cause the controller to orphan flow files, then an
* UninheritableFlowException is thrown.

View File

@ -67,6 +67,13 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
this.encryptor = encryptor;
}
@Override
public boolean isFlowPresent() {
final File flowXmlFile = flowXmlPath.toFile();
return flowXmlFile.exists() && flowXmlFile.length() > 0;
}
@Override
public synchronized void load(final FlowController controller, final DataFlow dataFlow)
throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
@ -78,8 +85,7 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
@Override
public synchronized void load(final OutputStream os) throws IOException {
final File file = flowXmlPath.toFile();
if (!file.exists() || file.length() == 0) {
if (!isFlowPresent()) {
return;
}